Merge pull request #10250 from mxpv/grpc

Bump GRPC to 1.64
This commit is contained in:
Maksym Pavlenko 2024-07-22 05:15:36 +00:00 committed by GitHub
commit 01a2b6fd86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 931 additions and 754 deletions

View File

@ -21,11 +21,14 @@ import (
"context"
"encoding/json"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/grpc/resolver"
containersapi "github.com/containerd/containerd/api/services/containers/v1"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
@ -79,6 +82,14 @@ func init() {
typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources")
typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources")
typeurl.Register(&features.Features{}, prefix, "opencontainers/runtime-spec", major, "features", "Features")
if runtime.GOOS == "windows" {
// After bumping GRPC to 1.64, Windows tests started failing with: "name resolver error: produced zero addresses".
// This is happening because grpc.NewClient uses DNS resolver by default, which apparently not what we want
// when using socket paths on Windows.
// Using a workaround from https://github.com/grpc/grpc-go/issues/1786#issuecomment-2119088770
resolver.SetDefaultScheme("passthrough")
}
}
// New returns a new containerd client that is connected to the containerd
@ -115,17 +126,14 @@ func New(address string, opts ...Opt) (*Client, error) {
}
if address != "" {
backoffConfig := backoff.DefaultConfig
backoffConfig.MaxDelay = 3 * time.Second
backoffConfig.MaxDelay = copts.timeout
connParams := grpc.ConnectParams{
Backoff: backoffConfig,
}
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.FailOnNonTempDialError(true),
grpc.WithConnectParams(connParams),
grpc.WithContextDialer(dialer.ContextDialer),
grpc.WithReturnConnectionError(),
}
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
@ -143,9 +151,7 @@ func New(address string, opts ...Opt) (*Client, error) {
}
connector := func() (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), copts.timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
conn, err := grpc.NewClient(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to dial %q: %w", address, err)
}

View File

@ -100,15 +100,11 @@ func connect(address string, d func(context.Context, string) (net.Conn, error))
Backoff: backoffConfig,
}
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(d),
grpc.FailOnNonTempDialError(true),
grpc.WithConnectParams(connParams),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
conn, err := grpc.NewClient(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to dial %q: %w", address, err)
}

View File

@ -587,7 +587,7 @@ func (pc *proxyClients) getClient(address string) (*grpc.ClientConn, error) {
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
}
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
conn, err := grpc.NewClient(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to dial %q: %w", address, err)
}

View File

@ -275,14 +275,10 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam
return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
return grpcDialContext(ctx, params.Address, onClose, gopts...)
return grpcDialContext(params.Address, onClose, gopts...)
default:
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol)
}
@ -292,7 +288,6 @@ func makeConnection(ctx context.Context, id string, params client.BootstrapParam
// so we can have something similar to ttrpc.WithOnClose to have
// a callback run when the connection is severed or explicitly closed.
func grpcDialContext(
ctx context.Context,
address string,
onClose func(),
gopts ...grpc.DialOption,
@ -316,7 +311,7 @@ func grpcDialContext(
conn.Close()
target := dialer.DialAddress(address)
client, err := grpc.DialContext(ctx, target, gopts...)
client, err := grpc.NewClient(target, gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}

8
go.mod
View File

@ -72,8 +72,8 @@ require (
golang.org/x/mod v0.19.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.22.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be
google.golang.org/grpc v1.63.2
google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
@ -131,12 +131,12 @@ require (
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

18
go.sum
View File

@ -397,8 +397,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -472,19 +472,17 @@ google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJ
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo=
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0=
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4=
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 h1:Q2RxlXqh1cgzzUgV261vBO2jI5R/3DD1J2pM0nI4NhU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@ -245,14 +245,8 @@ func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error)
return stream, nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, realAddress, gopts...)
gopts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
conn, err := grpc.NewClient(realAddress, gopts...)
if err != nil {
return nil, err
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Google LLC
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,7 +15,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.9
// protoc v4.24.4
// source: google/rpc/code.proto
package code

View File

@ -1,4 +1,4 @@
// Copyright 2022 Google LLC
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,7 +15,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.9
// protoc v4.24.4
// source: google/rpc/error_details.proto
package errdetails

View File

@ -1,4 +1,4 @@
// Copyright 2022 Google LLC
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,7 +15,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.9
// protoc v4.24.4
// source: google/rpc/status.proto
package status

View File

@ -66,7 +66,7 @@ How to get your contributions merged smoothly and quickly.
- **All tests need to be passing** before your change can be merged. We
recommend you **run tests locally** before creating your PR to catch breakages
early on.
- `VET_SKIP_PROTO=1 ./vet.sh` to catch vet errors
- `./scripts/vet.sh` to catch vet errors
- `go test -cpu 1,4 -timeout 7m ./...` to run the tests
- `go test -race -cpu 1,4 -timeout 7m ./...` to run tests in race mode

View File

@ -9,6 +9,7 @@ for general contribution guidelines.
## Maintainers (in alphabetical order)
- [atollena](https://github.com/atollena), Datadog, Inc.
- [cesarghali](https://github.com/cesarghali), Google LLC
- [dfawley](https://github.com/dfawley), Google LLC
- [easwars](https://github.com/easwars), Google LLC

View File

@ -30,17 +30,20 @@ testdeps:
GO111MODULE=on go get -d -v -t google.golang.org/grpc/...
vet: vetdeps
./vet.sh
./scripts/vet.sh
vetdeps:
./vet.sh -install
./scripts/vet.sh -install
.PHONY: \
all \
build \
clean \
deps \
proto \
test \
testsubmodule \
testrace \
testdeps \
vet \
vetdeps

View File

@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc-gen-go v1.33.0
// protoc v4.25.2
// source: grpc/binlog/v1/binarylog.proto

View File

@ -37,7 +37,6 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@ -121,8 +120,9 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// https://github.com/grpc/grpc/blob/master/doc/naming.md. e.g. to use dns
// resolver, a "dns:///" prefix should be applied to the target.
//
// The DialOptions returned by WithBlock, WithTimeout, and
// WithReturnConnectionError are ignored by this function.
// The DialOptions returned by WithBlock, WithTimeout,
// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
@ -196,6 +196,8 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
// Dial calls DialContext(context.Background(), target, opts...).
//
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
@ -209,6 +211,8 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// "passthrough" for backward compatibility. This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
//
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
// At the end of this method, we kick the channel out of idle, rather than
// waiting for the first rpc.
@ -838,6 +842,9 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Start with our address set to the first address; this may be updated if
// we connect to different addresses.
ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel created",
@ -929,10 +936,14 @@ func equalAddresses(a, b []resolver.Address) bool {
// updateAddrs updates ac.addrs with the new addresses list and handles active
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
ac.mu.Lock()
channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
addrs = copyAddressesWithoutBalancerAttributes(addrs)
limit := len(addrs)
if limit > 5 {
limit = 5
}
channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
ac.mu.Lock()
if equalAddresses(ac.addrs, addrs) {
ac.mu.Unlock()
return
@ -1167,6 +1178,10 @@ type addrConn struct {
// is received, transport is closed, ac has been torn down).
transport transport.ClientTransport // The current transport.
// This mutex is used on the RPC path, so its usage should be minimized as
// much as possible.
// TODO: Find a lock-free way to retrieve the transport and state from the
// addrConn.
mu sync.Mutex
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@ -1292,6 +1307,7 @@ func (ac *addrConn) resetTransport() {
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
var firstConnErr error
for _, addr := range addrs {
ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
if ctx.Err() != nil {
return errConnClosing
}
@ -1739,7 +1755,7 @@ func encodeAuthority(authority string) string {
return false
case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
return false
case ':', '[', ']', '@': // Authority related delimeters
case ':', '[', ']', '@': // Authority related delimiters
return false
}
// Everything else must be escaped.

View File

@ -1,17 +0,0 @@
#!/usr/bin/env bash
# This script serves as an example to demonstrate how to generate the gRPC-Go
# interface and the related messages from .proto file.
#
# It assumes the installation of i) Google proto buffer compiler at
# https://github.com/google/protobuf (after v2.6.1) and ii) the Go codegen
# plugin at https://github.com/golang/protobuf (after 2015-02-20). If you have
# not, please install them first.
#
# We recommend running this script at $GOPATH/src.
#
# If this is not what you need, feel free to make your own scripts. Again, this
# script is for demonstration purpose.
#
proto=$1
protoc --go_out=plugins=grpc:. $proto

View File

@ -235,7 +235,7 @@ func (c *Code) UnmarshalJSON(b []byte) error {
if ci, err := strconv.ParseUint(string(b), 10, 32); err == nil {
if ci >= _maxCode {
return fmt.Errorf("invalid code: %q", ci)
return fmt.Errorf("invalid code: %d", ci)
}
*c = Code(ci)

View File

@ -30,7 +30,7 @@ import (
"google.golang.org/grpc/attributes"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/protobuf/protoadapt"
"google.golang.org/protobuf/proto"
)
// PerRPCCredentials defines the common interface for the credentials which need to
@ -237,7 +237,7 @@ func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo {
}
// CheckSecurityLevel checks if a connection's security level is greater than or equal to the specified one.
// It returns success if 1) the condition is satisified or 2) AuthInfo struct does not implement GetCommonAuthInfo() method
// It returns success if 1) the condition is satisfied or 2) AuthInfo struct does not implement GetCommonAuthInfo() method
// or 3) CommonAuthInfo.SecurityLevel has an invalid zero value. For 2) and 3), it is for the purpose of backward-compatibility.
//
// This API is experimental.
@ -287,5 +287,5 @@ type ChannelzSecurityValue interface {
type OtherChannelzSecurityValue struct {
ChannelzSecurityValue
Name string
Value protoadapt.MessageV1
Value proto.Message
}

View File

@ -300,6 +300,9 @@ func withBackoff(bs internalbackoff.Strategy) DialOption {
//
// Use of this feature is not recommended. For more information, please see:
// https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md
//
// Deprecated: this DialOption is not supported by NewClient.
// Will be supported throughout 1.x.
func WithBlock() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.block = true
@ -314,10 +317,8 @@ func WithBlock() DialOption {
// Use of this feature is not recommended. For more information, please see:
// https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
// Deprecated: this DialOption is not supported by NewClient.
// Will be supported throughout 1.x.
func WithReturnConnectionError() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.block = true
@ -387,8 +388,8 @@ func WithCredentialsBundle(b credentials.Bundle) DialOption {
// WithTimeout returns a DialOption that configures a timeout for dialing a
// ClientConn initially. This is valid if and only if WithBlock() is present.
//
// Deprecated: use DialContext instead of Dial and context.WithTimeout
// instead. Will be supported throughout 1.x.
// Deprecated: this DialOption is not supported by NewClient.
// Will be supported throughout 1.x.
func WithTimeout(d time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.timeout = d
@ -470,9 +471,8 @@ func withBinaryLogger(bl binarylog.Logger) DialOption {
// Use of this feature is not recommended. For more information, please see:
// https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// Deprecated: this DialOption is not supported by NewClient.
// This API may be changed or removed in a
// later release.
func FailOnNonTempDialError(f bool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
@ -601,12 +601,22 @@ func WithDisableRetry() DialOption {
})
}
// MaxHeaderListSizeDialOption is a DialOption that specifies the maximum
// (uncompressed) size of header list that the client is prepared to accept.
type MaxHeaderListSizeDialOption struct {
MaxHeaderListSize uint32
}
func (o MaxHeaderListSizeDialOption) apply(do *dialOptions) {
do.copts.MaxHeaderListSize = &o.MaxHeaderListSize
}
// WithMaxHeaderListSize returns a DialOption that specifies the maximum
// (uncompressed) size of header list that the client is prepared to accept.
func WithMaxHeaderListSize(s uint32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.MaxHeaderListSize = &s
})
return MaxHeaderListSizeDialOption{
MaxHeaderListSize: s,
}
}
// WithDisableHealthCheck disables the LB channel health checking for all
@ -648,7 +658,7 @@ func defaultDialOptions() dialOptions {
}
}
// withGetMinConnectDeadline specifies the function that clientconn uses to
// withMinConnectDeadline specifies the function that clientconn uses to
// get minConnectDeadline. This can be used to make connection attempts happen
// faster/slower.
//

View File

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc-gen-go v1.33.0
// protoc v4.25.2
// source: grpc/health/v1/health.proto

View File

@ -32,8 +32,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
Health_Check_FullMethodName = "/grpc.health.v1.Health/Check"
@ -81,8 +81,9 @@ func NewHealthClient(cc grpc.ClientConnInterface) HealthClient {
}
func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -90,11 +91,12 @@ func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts .
}
func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &healthWatchClient{stream}
x := &healthWatchClient{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
@ -198,7 +200,7 @@ func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(HealthServer).Watch(m, &healthWatchServer{stream})
return srv.(HealthServer).Watch(m, &healthWatchServer{ServerStream: stream})
}
type Health_WatchServer interface {

View File

@ -75,7 +75,6 @@ func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
if err != nil {
return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err)
}
return &lbConfig{childBuilder: builder, childConfig: cfg}, nil
}

View File

@ -169,7 +169,6 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
gsbCfg, ok := state.BalancerConfig.(*lbConfig)
if ok {
// Switch to the child in the config unless it is already active.

View File

@ -65,7 +65,7 @@ type TruncatingMethodLogger struct {
callID uint64
idWithinCallGen *callIDGenerator
sink Sink // TODO(blog): make this plugable.
sink Sink // TODO(blog): make this pluggable.
}
// NewTruncatingMethodLogger returns a new truncating method logger.
@ -80,7 +80,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
callID: idGen.next(),
idWithinCallGen: &callIDGenerator{},
sink: DefaultSink, // TODO(blog): make it plugable.
sink: DefaultSink, // TODO(blog): make it pluggable.
}
}
@ -397,7 +397,7 @@ func metadataKeyOmit(key string) bool {
switch key {
case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
return true
case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
case "grpc-trace-bin": // grpc-trace-bin is special because it's visible to users.
return false
}
return strings.HasPrefix(key, "grpc-")

View File

@ -28,9 +28,6 @@ import (
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = boolFromEnv("GRPC_GO_IGNORE_TXT_ERRORS", true)
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = boolFromEnv("GRPC_GO_ADVERTISE_COMPRESSORS", true)
// RingHashCap indicates the maximum ring size which defaults to 4096
// entries but may be overridden by setting the environment variable
// "GRPC_RING_HASH_CAP". This does not override the default bounds

View File

@ -20,8 +20,6 @@ package grpcutil
import (
"strings"
"google.golang.org/grpc/internal/envconfig"
)
// RegisteredCompressorNames holds names of the registered compressors.
@ -40,8 +38,5 @@ func IsCompressorNameRegistered(name string) bool {
// RegisteredCompressors returns a string of registered compressor names
// separated by comma.
func RegisteredCompressors() string {
if !envconfig.AdvertiseCompressors {
return ""
}
return strings.Join(RegisteredCompressorNames, ",")
}

View File

@ -41,18 +41,24 @@ import (
"google.golang.org/grpc/serviceconfig"
)
// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
// addresses from SRV records. Must not be changed after init time.
var EnableSRVLookups = false
var (
// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
// addresses from SRV records. Must not be changed after init time.
EnableSRVLookups = false
// ResolvingTimeout specifies the maximum duration for a DNS resolution request.
// If the timeout expires before a response is received, the request will be canceled.
//
// It is recommended to set this value at application startup. Avoid modifying this variable
// after initialization as it's not thread-safe for concurrent modification.
var ResolvingTimeout = 30 * time.Second
// MinResolutionInterval is the minimum interval at which re-resolutions are
// allowed. This helps to prevent excessive re-resolution.
MinResolutionInterval = 30 * time.Second
var logger = grpclog.Component("dns")
// ResolvingTimeout specifies the maximum duration for a DNS resolution request.
// If the timeout expires before a response is received, the request will be canceled.
//
// It is recommended to set this value at application startup. Avoid modifying this variable
// after initialization as it's not thread-safe for concurrent modification.
ResolvingTimeout = 30 * time.Second
logger = grpclog.Component("dns")
)
func init() {
resolver.Register(NewBuilder())
@ -208,7 +214,7 @@ func (d *dnsResolver) watcher() {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
waitTime = internal.MinResolutionRate
waitTime = MinResolutionInterval
select {
case <-d.ctx.Done():
return

View File

@ -28,7 +28,7 @@ import (
// NetResolver groups the methods on net.Resolver that are used by the DNS
// resolver implementation. This allows the default net.Resolver instance to be
// overidden from tests.
// overridden from tests.
type NetResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
@ -50,10 +50,6 @@ var (
// The following vars are overridden from tests.
var (
// MinResolutionRate is the minimum rate at which re-resolutions are
// allowed. This helps to prevent excessive re-resolution.
MinResolutionRate = 30 * time.Second
// TimeAfterFunc is used by the DNS resolver to wait for the given duration
// to elapse. In non-test code, this is implemented by time.After. In test
// code, this can be used to control the amount of time the resolver is

View File

@ -193,7 +193,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
closeConn error // if set, loopyWriter will exit with this error
}
func (*goAway) isTransportResponseFrame() bool { return false }
@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {
return err
}
func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@ -344,7 +344,7 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err
return false, c.err
}
if f != nil {
if !f(it) { // f wasn't successful
if !f() { // f wasn't successful
c.mu.Unlock()
return false, nil
}
@ -495,21 +495,22 @@ type loopyWriter struct {
ssGoAwayHandler func(*goAway) (bool, error)
}
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
}
return l
}

View File

@ -114,11 +114,11 @@ type http2Client struct {
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
nextID uint32
state transportState
activeStreams map[uint32]*Stream
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
@ -408,10 +408,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
// writerDone should be closed since the loopy goroutine
// wouldn't have started in the case this function returns an error.
close(t.writerDone)
t.Close(err)
}
}()
@ -458,8 +458,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// Block until the server preface is received successfully or an error occurs.
if err = <-readerErrCh; err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
@ -517,6 +521,17 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConn
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
@ -781,7 +796,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it any) bool {
checkForStreamQuota := func() bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@ -793,23 +808,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
t.waitingStreams--
}
t.streamQuota--
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
hdr.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = hdr.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
@ -819,13 +835,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
checkForHeaderListSize := func(it any) bool {
checkForHeaderListSize := func() bool {
if t.maxSendHeaderListSize == nil {
return true
}
hdrFrame := it.(*headerFrame)
var sz int64
for _, f := range hdrFrame.hf {
for _, f := range hdr.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
return false
@ -834,8 +849,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
success, err := t.controlBuf.executeAndPut(func(it any) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
success, err := t.controlBuf.executeAndPut(func() bool {
return checkForHeaderListSize() && checkForStreamQuota()
}, hdr)
if err != nil {
// Connection closed.
@ -946,7 +961,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
rst: rst,
rstCode: rstCode,
}
addBackStreamQuota := func(any) bool {
addBackStreamQuota := func() bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@ -966,7 +981,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
// accessed anymore.
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only close once.
@ -991,7 +1006,10 @@ func (t *http2Client) Close(err error) {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
<-t.writerDone
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelz.ID)
@ -1099,7 +1117,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
updateIWS := func(any) bool {
updateIWS := func() bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@ -1252,7 +1270,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}

View File

@ -330,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
@ -860,7 +859,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@ -1014,12 +1013,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
hf := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
onWrite: t.setResetPingStrikes,
})
}
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
if !success {
if err != nil {
return err
@ -1208,7 +1208,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
return
}
if !outstandingPing {

View File

@ -304,7 +304,7 @@ func (s *Stream) isHeaderSent() bool {
}
// updateHeaderSent updates headerSent and returns true
// if it was alreay set. It is valid only on server-side.
// if it was already set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}

View File

@ -90,6 +90,21 @@ func Pairs(kv ...string) MD {
return md
}
// String implements the Stringer interface for pretty-printing a MD.
// Ordering of the values is non-deterministic as it ranges over a map.
func (md MD) String() string {
var sb strings.Builder
fmt.Fprintf(&sb, "MD{")
for k, v := range md {
if sb.Len() > 3 {
fmt.Fprintf(&sb, ", ")
}
fmt.Fprintf(&sb, "%s=[%s]", k, strings.Join(v, ", "))
}
fmt.Fprintf(&sb, "}")
return sb.String()
}
// Len returns the number of items in md.
func (md MD) Len() int {
return len(md)

View File

@ -22,7 +22,9 @@ package peer
import (
"context"
"fmt"
"net"
"strings"
"google.golang.org/grpc/credentials"
)
@ -39,6 +41,34 @@ type Peer struct {
AuthInfo credentials.AuthInfo
}
// String ensures the Peer types implements the Stringer interface in order to
// allow to print a context with a peerKey value effectively.
func (p *Peer) String() string {
if p == nil {
return "Peer<nil>"
}
sb := &strings.Builder{}
sb.WriteString("Peer{")
if p.Addr != nil {
fmt.Fprintf(sb, "Addr: '%s', ", p.Addr.String())
} else {
fmt.Fprintf(sb, "Addr: <nil>, ")
}
if p.LocalAddr != nil {
fmt.Fprintf(sb, "LocalAddr: '%s', ", p.LocalAddr.String())
} else {
fmt.Fprintf(sb, "LocalAddr: <nil>, ")
}
if p.AuthInfo != nil {
fmt.Fprintf(sb, "AuthInfo: '%s'", p.AuthInfo.AuthType())
} else {
fmt.Fprintf(sb, "AuthInfo: <nil>")
}
sb.WriteString("}")
return sb.String()
}
type peerKey struct{}
// NewContext creates a new context with peer information attached.

View File

@ -20,6 +20,7 @@ package grpc
import (
"context"
"fmt"
"io"
"sync"
@ -117,7 +118,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if lastPickErr != nil {
errStr = "latest balancer error: " + lastPickErr.Error()
} else {
errStr = ctx.Err().Error()
errStr = fmt.Sprintf("received context error while waiting for new LB policy update: %s", ctx.Err().Error())
}
switch ctx.Err() {
case context.DeadlineExceeded:

View File

@ -54,7 +54,7 @@ type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
// If set to true, instructs the LB policy to shuffle the order of the list
// of addresses received from the name resolver before attempting to
// of endpoints received from the name resolver before attempting to
// connect to them.
ShuffleAddressList bool `json:"shuffleAddressList"`
}
@ -94,8 +94,7 @@ func (b *pickfirstBalancer) ResolverError(err error) {
}
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
addrs := state.ResolverState.Addresses
if len(addrs) == 0 {
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
@ -107,22 +106,49 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// We don't have to guard this block with the env var because ParseConfig
// already does so.
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
if b.logger.V(2) {
b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
}
var addrs []resolver.Address
if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
// Perform the optional shuffling described in gRFC A62. The shuffling will
// change the order of endpoints but not touch the order of the addresses
// within each endpoint. - A61
if cfg.ShuffleAddressList {
endpoints = append([]resolver.Endpoint{}, endpoints...)
grpcrand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
}
// "Flatten the list by concatenating the ordered list of addresses for each
// of the endpoints, in order." - A61
for _, endpoint := range endpoints {
// "In the flattened list, interleave addresses from the two address
// families, as per RFC-8304 section 4." - A61
// TODO: support the above language.
addrs = append(addrs, endpoint.Addresses...)
}
} else {
// Endpoints not set, process addresses until we migrate resolver
// emissions fully to Endpoints. The top channel does wrap emitted
// addresses with endpoints, however some balancers such as weighted
// target do not forwarrd the corresponding correct endpoints down/split
// endpoints properly. Once all balancers correctly forward endpoints
// down, can delete this else conditional.
addrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
}
if b.subConn != nil {
b.cc.UpdateAddresses(b.subConn, addrs)
return nil

View File

@ -19,10 +19,11 @@
package reflection
import (
"google.golang.org/grpc/reflection/internal"
v1reflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1reflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1alphareflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
v1alphareflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
// asV1Alpha returns an implementation of the v1alpha version of the reflection
@ -44,7 +45,7 @@ type v1AlphaServerStreamAdapter struct {
}
func (s v1AlphaServerStreamAdapter) Send(response *v1reflectionpb.ServerReflectionResponse) error {
return s.ServerReflection_ServerReflectionInfoServer.Send(v1ToV1AlphaResponse(response))
return s.ServerReflection_ServerReflectionInfoServer.Send(internal.V1ToV1AlphaResponse(response))
}
func (s v1AlphaServerStreamAdapter) Recv() (*v1reflectionpb.ServerReflectionRequest, error) {
@ -52,136 +53,5 @@ func (s v1AlphaServerStreamAdapter) Recv() (*v1reflectionpb.ServerReflectionRequ
if err != nil {
return nil, err
}
return v1AlphaToV1Request(resp), nil
}
func v1ToV1AlphaResponse(v1 *v1reflectionpb.ServerReflectionResponse) *v1alphareflectionpb.ServerReflectionResponse {
var v1alpha v1alphareflectionpb.ServerReflectionResponse
v1alpha.ValidHost = v1.ValidHost
if v1.OriginalRequest != nil {
v1alpha.OriginalRequest = v1ToV1AlphaRequest(v1.OriginalRequest)
}
switch mr := v1.MessageResponse.(type) {
case *v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1alphareflectionpb.FileDescriptorResponse{
FileDescriptorProto: mr.FileDescriptorResponse.GetFileDescriptorProto(),
},
}
}
case *v1reflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &v1alphareflectionpb.ExtensionNumberResponse{
BaseTypeName: mr.AllExtensionNumbersResponse.GetBaseTypeName(),
ExtensionNumber: mr.AllExtensionNumbersResponse.GetExtensionNumber(),
},
}
}
case *v1reflectionpb.ServerReflectionResponse_ListServicesResponse:
if mr != nil {
svcs := make([]*v1alphareflectionpb.ServiceResponse, len(mr.ListServicesResponse.GetService()))
for i, svc := range mr.ListServicesResponse.GetService() {
svcs[i] = &v1alphareflectionpb.ServiceResponse{
Name: svc.GetName(),
}
}
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &v1alphareflectionpb.ListServiceResponse{
Service: svcs,
},
}
}
case *v1reflectionpb.ServerReflectionResponse_ErrorResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1alphareflectionpb.ErrorResponse{
ErrorCode: mr.ErrorResponse.GetErrorCode(),
ErrorMessage: mr.ErrorResponse.GetErrorMessage(),
},
}
}
default:
// no value set
}
return &v1alpha
}
func v1AlphaToV1Request(v1alpha *v1alphareflectionpb.ServerReflectionRequest) *v1reflectionpb.ServerReflectionRequest {
var v1 v1reflectionpb.ServerReflectionRequest
v1.Host = v1alpha.Host
switch mr := v1alpha.MessageRequest.(type) {
case *v1alphareflectionpb.ServerReflectionRequest_FileByFilename:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileByFilename{
FileByFilename: mr.FileByFilename,
}
case *v1alphareflectionpb.ServerReflectionRequest_FileContainingSymbol:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: mr.FileContainingSymbol,
}
case *v1alphareflectionpb.ServerReflectionRequest_FileContainingExtension:
if mr.FileContainingExtension != nil {
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &v1reflectionpb.ExtensionRequest{
ContainingType: mr.FileContainingExtension.GetContainingType(),
ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(),
},
}
}
case *v1alphareflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType{
AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType,
}
case *v1alphareflectionpb.ServerReflectionRequest_ListServices:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_ListServices{
ListServices: mr.ListServices,
}
default:
// no value set
}
return &v1
}
func v1ToV1AlphaRequest(v1 *v1reflectionpb.ServerReflectionRequest) *v1alphareflectionpb.ServerReflectionRequest {
var v1alpha v1alphareflectionpb.ServerReflectionRequest
v1alpha.Host = v1.Host
switch mr := v1.MessageRequest.(type) {
case *v1reflectionpb.ServerReflectionRequest_FileByFilename:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileByFilename{
FileByFilename: mr.FileByFilename,
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingSymbol:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: mr.FileContainingSymbol,
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingExtension:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &v1alphareflectionpb.ExtensionRequest{
ContainingType: mr.FileContainingExtension.GetContainingType(),
ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(),
},
}
}
case *v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType{
AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType,
}
}
case *v1reflectionpb.ServerReflectionRequest_ListServices:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_ListServices{
ListServices: mr.ListServices,
}
}
default:
// no value set
}
return &v1alpha
return internal.V1AlphaToV1Request(resp), nil
}

View File

@ -21,7 +21,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc-gen-go v1.33.0
// protoc v4.25.2
// source: grpc/reflection/v1/reflection.proto

View File

@ -36,8 +36,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
ServerReflection_ServerReflectionInfo_FullMethodName = "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo"
@ -61,11 +61,12 @@ func NewServerReflectionClient(cc grpc.ClientConnInterface) ServerReflectionClie
}
func (c *serverReflectionClient) ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (ServerReflection_ServerReflectionInfoClient, error) {
stream, err := c.cc.NewStream(ctx, &ServerReflection_ServiceDesc.Streams[0], ServerReflection_ServerReflectionInfo_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ServerReflection_ServiceDesc.Streams[0], ServerReflection_ServerReflectionInfo_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &serverReflectionServerReflectionInfoClient{stream}
x := &serverReflectionServerReflectionInfoClient{ClientStream: stream}
return x, nil
}
@ -120,7 +121,7 @@ func RegisterServerReflectionServer(s grpc.ServiceRegistrar, srv ServerReflectio
}
func _ServerReflection_ServerReflectionInfo_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ServerReflectionServer).ServerReflectionInfo(&serverReflectionServerReflectionInfoServer{stream})
return srv.(ServerReflectionServer).ServerReflectionInfo(&serverReflectionServerReflectionInfoServer{ServerStream: stream})
}
type ServerReflection_ServerReflectionInfoServer interface {

View File

@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc-gen-go v1.33.0
// protoc v4.25.2
// grpc/reflection/v1alpha/reflection.proto is a deprecated file.

View File

@ -33,8 +33,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
ServerReflection_ServerReflectionInfo_FullMethodName = "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo"
@ -58,11 +58,12 @@ func NewServerReflectionClient(cc grpc.ClientConnInterface) ServerReflectionClie
}
func (c *serverReflectionClient) ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (ServerReflection_ServerReflectionInfoClient, error) {
stream, err := c.cc.NewStream(ctx, &ServerReflection_ServiceDesc.Streams[0], ServerReflection_ServerReflectionInfo_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ServerReflection_ServiceDesc.Streams[0], ServerReflection_ServerReflectionInfo_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &serverReflectionServerReflectionInfoClient{stream}
x := &serverReflectionServerReflectionInfoClient{ClientStream: stream}
return x, nil
}
@ -117,7 +118,7 @@ func RegisterServerReflectionServer(s grpc.ServiceRegistrar, srv ServerReflectio
}
func _ServerReflection_ServerReflectionInfo_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ServerReflectionServer).ServerReflectionInfo(&serverReflectionServerReflectionInfoServer{stream})
return srv.(ServerReflectionServer).ServerReflectionInfo(&serverReflectionServerReflectionInfoServer{ServerStream: stream})
}
type ServerReflection_ServerReflectionInfoServer interface {

View File

@ -0,0 +1,436 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package internal contains code that is shared by both reflection package and
// the test package. The packages are split in this way inorder to avoid
// depenedency to deprecated package github.com/golang/protobuf.
package internal
import (
"io"
"sort"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
v1reflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1reflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1alphareflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
v1alphareflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
// ServiceInfoProvider is an interface used to retrieve metadata about the
// services to expose.
type ServiceInfoProvider interface {
GetServiceInfo() map[string]grpc.ServiceInfo
}
// ExtensionResolver is the interface used to query details about extensions.
// This interface is satisfied by protoregistry.GlobalTypes.
type ExtensionResolver interface {
protoregistry.ExtensionTypeResolver
RangeExtensionsByMessage(message protoreflect.FullName, f func(protoreflect.ExtensionType) bool)
}
// ServerReflectionServer is the server API for ServerReflection service.
type ServerReflectionServer struct {
v1alphareflectiongrpc.UnimplementedServerReflectionServer
S ServiceInfoProvider
DescResolver protodesc.Resolver
ExtResolver ExtensionResolver
}
// FileDescWithDependencies returns a slice of serialized fileDescriptors in
// wire format ([]byte). The fileDescriptors will include fd and all the
// transitive dependencies of fd with names not in sentFileDescriptors.
func (s *ServerReflectionServer) FileDescWithDependencies(fd protoreflect.FileDescriptor, sentFileDescriptors map[string]bool) ([][]byte, error) {
if fd.IsPlaceholder() {
// If the given root file is a placeholder, treat it
// as missing instead of serializing it.
return nil, protoregistry.NotFound
}
var r [][]byte
queue := []protoreflect.FileDescriptor{fd}
for len(queue) > 0 {
currentfd := queue[0]
queue = queue[1:]
if currentfd.IsPlaceholder() {
// Skip any missing files in the dependency graph.
continue
}
if sent := sentFileDescriptors[currentfd.Path()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.Path()] = true
fdProto := protodesc.ToFileDescriptorProto(currentfd)
currentfdEncoded, err := proto.Marshal(fdProto)
if err != nil {
return nil, err
}
r = append(r, currentfdEncoded)
}
for i := 0; i < currentfd.Imports().Len(); i++ {
queue = append(queue, currentfd.Imports().Get(i))
}
}
return r, nil
}
// FileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result. The given symbol
// can be a type, a service or a method.
func (s *ServerReflectionServer) FileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
d, err := s.DescResolver.FindDescriptorByName(protoreflect.FullName(name))
if err != nil {
return nil, err
}
return s.FileDescWithDependencies(d.ParentFile(), sentFileDescriptors)
}
// FileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result.
func (s *ServerReflectionServer) FileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
xt, err := s.ExtResolver.FindExtensionByNumber(protoreflect.FullName(typeName), protoreflect.FieldNumber(extNum))
if err != nil {
return nil, err
}
return s.FileDescWithDependencies(xt.TypeDescriptor().ParentFile(), sentFileDescriptors)
}
// AllExtensionNumbersForTypeName returns all extension numbers for the given type.
func (s *ServerReflectionServer) AllExtensionNumbersForTypeName(name string) ([]int32, error) {
var numbers []int32
s.ExtResolver.RangeExtensionsByMessage(protoreflect.FullName(name), func(xt protoreflect.ExtensionType) bool {
numbers = append(numbers, int32(xt.TypeDescriptor().Number()))
return true
})
sort.Slice(numbers, func(i, j int) bool {
return numbers[i] < numbers[j]
})
if len(numbers) == 0 {
// maybe return an error if given type name is not known
if _, err := s.DescResolver.FindDescriptorByName(protoreflect.FullName(name)); err != nil {
return nil, err
}
}
return numbers, nil
}
// ListServices returns the names of services this server exposes.
func (s *ServerReflectionServer) ListServices() []*v1reflectionpb.ServiceResponse {
serviceInfo := s.S.GetServiceInfo()
resp := make([]*v1reflectionpb.ServiceResponse, 0, len(serviceInfo))
for svc := range serviceInfo {
resp = append(resp, &v1reflectionpb.ServiceResponse{Name: svc})
}
sort.Slice(resp, func(i, j int) bool {
return resp[i].Name < resp[j].Name
})
return resp
}
// ServerReflectionInfo is the reflection service handler.
func (s *ServerReflectionServer) ServerReflectionInfo(stream v1reflectiongrpc.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := &v1reflectionpb.ServerReflectionResponse{
ValidHost: in.Host,
OriginalRequest: in,
}
switch req := in.MessageRequest.(type) {
case *v1reflectionpb.ServerReflectionRequest_FileByFilename:
var b [][]byte
fd, err := s.DescResolver.FindFileByPath(req.FileByFilename)
if err == nil {
b, err = s.FileDescWithDependencies(fd, sentFileDescriptors)
}
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingSymbol:
b, err := s.FileDescEncodingContainingSymbol(req.FileContainingSymbol, sentFileDescriptors)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingExtension:
typeName := req.FileContainingExtension.ContainingType
extNum := req.FileContainingExtension.ExtensionNumber
b, err := s.FileDescEncodingContainingExtension(typeName, extNum, sentFileDescriptors)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
extNums, err := s.AllExtensionNumbersForTypeName(req.AllExtensionNumbersOfType)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &v1reflectionpb.ExtensionNumberResponse{
BaseTypeName: req.AllExtensionNumbersOfType,
ExtensionNumber: extNums,
},
}
}
case *v1reflectionpb.ServerReflectionRequest_ListServices:
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &v1reflectionpb.ListServiceResponse{
Service: s.ListServices(),
},
}
default:
return status.Errorf(codes.InvalidArgument, "invalid MessageRequest: %v", in.MessageRequest)
}
if err := stream.Send(out); err != nil {
return err
}
}
}
// V1ToV1AlphaResponse converts a v1 ServerReflectionResponse to a v1alpha.
func V1ToV1AlphaResponse(v1 *v1reflectionpb.ServerReflectionResponse) *v1alphareflectionpb.ServerReflectionResponse {
var v1alpha v1alphareflectionpb.ServerReflectionResponse
v1alpha.ValidHost = v1.ValidHost
if v1.OriginalRequest != nil {
v1alpha.OriginalRequest = V1ToV1AlphaRequest(v1.OriginalRequest)
}
switch mr := v1.MessageResponse.(type) {
case *v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1alphareflectionpb.FileDescriptorResponse{
FileDescriptorProto: mr.FileDescriptorResponse.GetFileDescriptorProto(),
},
}
}
case *v1reflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &v1alphareflectionpb.ExtensionNumberResponse{
BaseTypeName: mr.AllExtensionNumbersResponse.GetBaseTypeName(),
ExtensionNumber: mr.AllExtensionNumbersResponse.GetExtensionNumber(),
},
}
}
case *v1reflectionpb.ServerReflectionResponse_ListServicesResponse:
if mr != nil {
svcs := make([]*v1alphareflectionpb.ServiceResponse, len(mr.ListServicesResponse.GetService()))
for i, svc := range mr.ListServicesResponse.GetService() {
svcs[i] = &v1alphareflectionpb.ServiceResponse{
Name: svc.GetName(),
}
}
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &v1alphareflectionpb.ListServiceResponse{
Service: svcs,
},
}
}
case *v1reflectionpb.ServerReflectionResponse_ErrorResponse:
if mr != nil {
v1alpha.MessageResponse = &v1alphareflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1alphareflectionpb.ErrorResponse{
ErrorCode: mr.ErrorResponse.GetErrorCode(),
ErrorMessage: mr.ErrorResponse.GetErrorMessage(),
},
}
}
default:
// no value set
}
return &v1alpha
}
// V1AlphaToV1Request converts a v1alpha ServerReflectionRequest to a v1.
func V1AlphaToV1Request(v1alpha *v1alphareflectionpb.ServerReflectionRequest) *v1reflectionpb.ServerReflectionRequest {
var v1 v1reflectionpb.ServerReflectionRequest
v1.Host = v1alpha.Host
switch mr := v1alpha.MessageRequest.(type) {
case *v1alphareflectionpb.ServerReflectionRequest_FileByFilename:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileByFilename{
FileByFilename: mr.FileByFilename,
}
case *v1alphareflectionpb.ServerReflectionRequest_FileContainingSymbol:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: mr.FileContainingSymbol,
}
case *v1alphareflectionpb.ServerReflectionRequest_FileContainingExtension:
if mr.FileContainingExtension != nil {
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &v1reflectionpb.ExtensionRequest{
ContainingType: mr.FileContainingExtension.GetContainingType(),
ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(),
},
}
}
case *v1alphareflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType{
AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType,
}
case *v1alphareflectionpb.ServerReflectionRequest_ListServices:
v1.MessageRequest = &v1reflectionpb.ServerReflectionRequest_ListServices{
ListServices: mr.ListServices,
}
default:
// no value set
}
return &v1
}
// V1ToV1AlphaRequest converts a v1 ServerReflectionRequest to a v1alpha.
func V1ToV1AlphaRequest(v1 *v1reflectionpb.ServerReflectionRequest) *v1alphareflectionpb.ServerReflectionRequest {
var v1alpha v1alphareflectionpb.ServerReflectionRequest
v1alpha.Host = v1.Host
switch mr := v1.MessageRequest.(type) {
case *v1reflectionpb.ServerReflectionRequest_FileByFilename:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileByFilename{
FileByFilename: mr.FileByFilename,
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingSymbol:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: mr.FileContainingSymbol,
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingExtension:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &v1alphareflectionpb.ExtensionRequest{
ContainingType: mr.FileContainingExtension.GetContainingType(),
ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(),
},
}
}
case *v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType{
AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType,
}
}
case *v1reflectionpb.ServerReflectionRequest_ListServices:
if mr != nil {
v1alpha.MessageRequest = &v1alphareflectionpb.ServerReflectionRequest_ListServices{
ListServices: mr.ListServices,
}
}
default:
// no value set
}
return &v1alpha
}
// V1AlphaToV1Response converts a v1alpha ServerReflectionResponse to a v1.
func V1AlphaToV1Response(v1alpha *v1alphareflectionpb.ServerReflectionResponse) *v1reflectionpb.ServerReflectionResponse {
var v1 v1reflectionpb.ServerReflectionResponse
v1.ValidHost = v1alpha.ValidHost
if v1alpha.OriginalRequest != nil {
v1.OriginalRequest = V1AlphaToV1Request(v1alpha.OriginalRequest)
}
switch mr := v1alpha.MessageResponse.(type) {
case *v1alphareflectionpb.ServerReflectionResponse_FileDescriptorResponse:
if mr != nil {
v1.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{
FileDescriptorProto: mr.FileDescriptorResponse.GetFileDescriptorProto(),
},
}
}
case *v1alphareflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse:
if mr != nil {
v1.MessageResponse = &v1reflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &v1reflectionpb.ExtensionNumberResponse{
BaseTypeName: mr.AllExtensionNumbersResponse.GetBaseTypeName(),
ExtensionNumber: mr.AllExtensionNumbersResponse.GetExtensionNumber(),
},
}
}
case *v1alphareflectionpb.ServerReflectionResponse_ListServicesResponse:
if mr != nil {
svcs := make([]*v1reflectionpb.ServiceResponse, len(mr.ListServicesResponse.GetService()))
for i, svc := range mr.ListServicesResponse.GetService() {
svcs[i] = &v1reflectionpb.ServiceResponse{
Name: svc.GetName(),
}
}
v1.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &v1reflectionpb.ListServiceResponse{
Service: svcs,
},
}
}
case *v1alphareflectionpb.ServerReflectionResponse_ErrorResponse:
if mr != nil {
v1.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: mr.ErrorResponse.GetErrorCode(),
ErrorMessage: mr.ErrorResponse.GetErrorMessage(),
},
}
}
default:
// no value set
}
return &v1
}

View File

@ -37,19 +37,13 @@ To register server reflection on a gRPC server:
package reflection // import "google.golang.org/grpc/reflection"
import (
"io"
"sort"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/reflection/internal"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
v1reflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1reflectionpb "google.golang.org/grpc/reflection/grpc_reflection_v1"
v1alphareflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
)
@ -158,203 +152,9 @@ func NewServerV1(opts ServerOptions) v1reflectiongrpc.ServerReflectionServer {
if opts.ExtensionResolver == nil {
opts.ExtensionResolver = protoregistry.GlobalTypes
}
return &serverReflectionServer{
s: opts.Services,
descResolver: opts.DescriptorResolver,
extResolver: opts.ExtensionResolver,
}
}
type serverReflectionServer struct {
v1alphareflectiongrpc.UnimplementedServerReflectionServer
s ServiceInfoProvider
descResolver protodesc.Resolver
extResolver ExtensionResolver
}
// fileDescWithDependencies returns a slice of serialized fileDescriptors in
// wire format ([]byte). The fileDescriptors will include fd and all the
// transitive dependencies of fd with names not in sentFileDescriptors.
func (s *serverReflectionServer) fileDescWithDependencies(fd protoreflect.FileDescriptor, sentFileDescriptors map[string]bool) ([][]byte, error) {
if fd.IsPlaceholder() {
// If the given root file is a placeholder, treat it
// as missing instead of serializing it.
return nil, protoregistry.NotFound
}
var r [][]byte
queue := []protoreflect.FileDescriptor{fd}
for len(queue) > 0 {
currentfd := queue[0]
queue = queue[1:]
if currentfd.IsPlaceholder() {
// Skip any missing files in the dependency graph.
continue
}
if sent := sentFileDescriptors[currentfd.Path()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.Path()] = true
fdProto := protodesc.ToFileDescriptorProto(currentfd)
currentfdEncoded, err := proto.Marshal(fdProto)
if err != nil {
return nil, err
}
r = append(r, currentfdEncoded)
}
for i := 0; i < currentfd.Imports().Len(); i++ {
queue = append(queue, currentfd.Imports().Get(i))
}
}
return r, nil
}
// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result. The given symbol
// can be a type, a service or a method.
func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
d, err := s.descResolver.FindDescriptorByName(protoreflect.FullName(name))
if err != nil {
return nil, err
}
return s.fileDescWithDependencies(d.ParentFile(), sentFileDescriptors)
}
// fileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
xt, err := s.extResolver.FindExtensionByNumber(protoreflect.FullName(typeName), protoreflect.FieldNumber(extNum))
if err != nil {
return nil, err
}
return s.fileDescWithDependencies(xt.TypeDescriptor().ParentFile(), sentFileDescriptors)
}
// allExtensionNumbersForTypeName returns all extension numbers for the given type.
func (s *serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
var numbers []int32
s.extResolver.RangeExtensionsByMessage(protoreflect.FullName(name), func(xt protoreflect.ExtensionType) bool {
numbers = append(numbers, int32(xt.TypeDescriptor().Number()))
return true
})
sort.Slice(numbers, func(i, j int) bool {
return numbers[i] < numbers[j]
})
if len(numbers) == 0 {
// maybe return an error if given type name is not known
if _, err := s.descResolver.FindDescriptorByName(protoreflect.FullName(name)); err != nil {
return nil, err
}
}
return numbers, nil
}
// listServices returns the names of services this server exposes.
func (s *serverReflectionServer) listServices() []*v1reflectionpb.ServiceResponse {
serviceInfo := s.s.GetServiceInfo()
resp := make([]*v1reflectionpb.ServiceResponse, 0, len(serviceInfo))
for svc := range serviceInfo {
resp = append(resp, &v1reflectionpb.ServiceResponse{Name: svc})
}
sort.Slice(resp, func(i, j int) bool {
return resp[i].Name < resp[j].Name
})
return resp
}
// ServerReflectionInfo is the reflection service handler.
func (s *serverReflectionServer) ServerReflectionInfo(stream v1reflectiongrpc.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := &v1reflectionpb.ServerReflectionResponse{
ValidHost: in.Host,
OriginalRequest: in,
}
switch req := in.MessageRequest.(type) {
case *v1reflectionpb.ServerReflectionRequest_FileByFilename:
var b [][]byte
fd, err := s.descResolver.FindFileByPath(req.FileByFilename)
if err == nil {
b, err = s.fileDescWithDependencies(fd, sentFileDescriptors)
}
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingSymbol:
b, err := s.fileDescEncodingContainingSymbol(req.FileContainingSymbol, sentFileDescriptors)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_FileContainingExtension:
typeName := req.FileContainingExtension.ContainingType
extNum := req.FileContainingExtension.ExtensionNumber
b, err := s.fileDescEncodingContainingExtension(typeName, extNum, sentFileDescriptors)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &v1reflectionpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *v1reflectionpb.ServerReflectionRequest_AllExtensionNumbersOfType:
extNums, err := s.allExtensionNumbersForTypeName(req.AllExtensionNumbersOfType)
if err != nil {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &v1reflectionpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &v1reflectionpb.ExtensionNumberResponse{
BaseTypeName: req.AllExtensionNumbersOfType,
ExtensionNumber: extNums,
},
}
}
case *v1reflectionpb.ServerReflectionRequest_ListServices:
out.MessageResponse = &v1reflectionpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &v1reflectionpb.ListServiceResponse{
Service: s.listServices(),
},
}
default:
return status.Errorf(codes.InvalidArgument, "invalid MessageRequest: %v", in.MessageRequest)
}
if err := stream.Send(out); err != nil {
return err
}
return &internal.ServerReflectionServer{
S: opts.Services,
DescResolver: opts.DescriptorResolver,
ExtResolver: opts.ExtensionResolver,
}
}

View File

@ -63,7 +63,7 @@ LEGACY_SOURCES=(
# Generates only the new gRPC Service symbols
SOURCES=(
$(git ls-files --exclude-standard --cached --others "*.proto" | grep -v '^\(profiling/proto/service.proto\|reflection/grpc_reflection_v1alpha/reflection.proto\)$')
$(git ls-files --exclude-standard --cached --others "*.proto" | grep -v '^profiling/proto/service.proto$')
${WORKDIR}/grpc-proto/grpc/gcp/altscontext.proto
${WORKDIR}/grpc-proto/grpc/gcp/handshaker.proto
${WORKDIR}/grpc-proto/grpc/gcp/transport_security_common.proto
@ -93,7 +93,7 @@ Mgrpc/testing/empty.proto=google.golang.org/grpc/interop/grpc_testing
for src in ${SOURCES[@]}; do
echo "protoc ${src}"
protoc --go_out=${OPTS}:${WORKDIR}/out --go-grpc_out=${OPTS}:${WORKDIR}/out \
protoc --go_out=${OPTS}:${WORKDIR}/out --go-grpc_out=${OPTS},use_generic_streams_experimental=true:${WORKDIR}/out \
-I"." \
-I${WORKDIR}/grpc-proto \
-I${WORKDIR}/googleapis \
@ -118,6 +118,6 @@ mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/
# grpc_testing_not_regenerate/*.pb.go are not re-generated,
# see grpc_testing_not_regenerate/README.md for details.
rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go
rm ${WORKDIR}/out/google.golang.org/grpc/reflection/test/grpc_testing_not_regenerate/*.pb.go
cp -R ${WORKDIR}/out/google.golang.org/grpc/* .

View File

@ -18,9 +18,6 @@
// Package dns implements a dns resolver to be installed as the default resolver
// in grpc.
//
// Deprecated: this package is imported by grpc and should not need to be
// imported directly by users.
package dns
import (
@ -52,3 +49,12 @@ func SetResolvingTimeout(timeout time.Duration) {
func NewBuilder() resolver.Builder {
return dns.NewBuilder()
}
// SetMinResolutionInterval sets the default minimum interval at which DNS
// re-resolutions are allowed. This helps to prevent excessive re-resolution.
//
// It must be called only at application startup, before any gRPC calls are
// made. Modifying this value after initialization is not thread-safe.
func SetMinResolutionInterval(d time.Duration) {
dns.MinResolutionInterval = d
}

View File

@ -964,7 +964,7 @@ func setCallInfoCodec(c *callInfo) error {
// The SupportPackageIsVersion variables are referenced from generated protocol
// buffer files to ensure compatibility with the gRPC version used. The latest
// support package version is 7.
// support package version is 9.
//
// Older versions are kept for compatibility.
//
@ -976,6 +976,7 @@ const (
SupportPackageIsVersion6 = true
SupportPackageIsVersion7 = true
SupportPackageIsVersion8 = true
SupportPackageIsVersion9 = true
)
const grpcUA = "grpc-go/" + Version

View File

@ -527,12 +527,22 @@ func ConnectionTimeout(d time.Duration) ServerOption {
})
}
// MaxHeaderListSizeServerOption is a ServerOption that sets the max
// (uncompressed) size of header list that the server is prepared to accept.
type MaxHeaderListSizeServerOption struct {
MaxHeaderListSize uint32
}
func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
so.maxHeaderListSize = &o.MaxHeaderListSize
}
// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
// of header list that the server is prepared to accept.
func MaxHeaderListSize(s uint32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.maxHeaderListSize = &s
})
return MaxHeaderListSizeServerOption{
MaxHeaderListSize: s,
}
}
// HeaderTableSize returns a ServerOption that sets the size of dynamic

View File

@ -172,7 +172,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
sc := ServiceConfig{
@ -219,7 +219,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
Timeout: (*time.Duration)(m.Timeout),
}
if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
logger.Warningf("grpc: unmarshaling service config %s: %v", js, err)
logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
return &serviceconfig.ParseResult{Err: err}
}
if m.MaxRequestMessageBytes != nil {
@ -239,13 +239,13 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
for i, n := range *m.Name {
path, err := n.generatePath()
if err != nil {
logger.Warningf("grpc: error unmarshaling service config %s due to methodConfig[%d]: %v", js, i, err)
logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
return &serviceconfig.ParseResult{Err: err}
}
if _, ok := paths[path]; ok {
err = errDuplicatedName
logger.Warningf("grpc: error unmarshaling service config %s due to methodConfig[%d]: %v", js, i, err)
logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
return &serviceconfig.ParseResult{Err: err}
}
paths[path] = struct{}{}

View File

@ -73,9 +73,12 @@ func (*PickerUpdated) isRPCStats() {}
type InPayload struct {
// Client is true if this InPayload is from client side.
Client bool
// Payload is the payload with original type.
// Payload is the payload with original type. This may be modified after
// the call to HandleRPC which provides the InPayload returns and must be
// copied if needed later.
Payload any
// Data is the serialized message payload.
// Deprecated: Data will be removed in the next release.
Data []byte
// Length is the size of the uncompressed payload data. Does not include any
@ -143,9 +146,12 @@ func (s *InTrailer) isRPCStats() {}
type OutPayload struct {
// Client is true if this OutPayload is from client side.
Client bool
// Payload is the payload with original type.
// Payload is the payload with original type. This may be modified after
// the call to HandleRPC which provides the OutPayload returns and must be
// copied if needed later.
Payload any
// Data is the serialized message payload.
// Deprecated: Data will be removed in the next release.
Data []byte
// Length is the size of the uncompressed payload data. Does not include any
// framing (gRPC or HTTP/2).

View File

@ -516,6 +516,7 @@ func (a *csAttempt) newStream() error {
return toRPCErr(nse.Err)
}
a.s = s
a.ctx = s.Context()
a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}
return nil
}

152
vendor/google.golang.org/grpc/stream_interfaces.go generated vendored Normal file
View File

@ -0,0 +1,152 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
// ServerStreamingClient represents the client side of a server-streaming (one
// request, many responses) RPC. It is generic over the type of the response
// message. It is used in generated code.
type ServerStreamingClient[Res any] interface {
Recv() (*Res, error)
ClientStream
}
// ServerStreamingServer represents the server side of a server-streaming (one
// request, many responses) RPC. It is generic over the type of the response
// message. It is used in generated code.
type ServerStreamingServer[Res any] interface {
Send(*Res) error
ServerStream
}
// ClientStreamingClient represents the client side of a client-streaming (many
// requests, one response) RPC. It is generic over both the type of the request
// message stream and the type of the unary response message. It is used in
// generated code.
type ClientStreamingClient[Req any, Res any] interface {
Send(*Req) error
CloseAndRecv() (*Res, error)
ClientStream
}
// ClientStreamingServer represents the server side of a client-streaming (many
// requests, one response) RPC. It is generic over both the type of the request
// message stream and the type of the unary response message. It is used in
// generated code.
type ClientStreamingServer[Req any, Res any] interface {
Recv() (*Req, error)
SendAndClose(*Res) error
ServerStream
}
// BidiStreamingClient represents the client side of a bidirectional-streaming
// (many requests, many responses) RPC. It is generic over both the type of the
// request message stream and the type of the response message stream. It is
// used in generated code.
type BidiStreamingClient[Req any, Res any] interface {
Send(*Req) error
Recv() (*Res, error)
ClientStream
}
// BidiStreamingServer represents the server side of a bidirectional-streaming
// (many requests, many responses) RPC. It is generic over both the type of the
// request message stream and the type of the response message stream. It is
// used in generated code.
type BidiStreamingServer[Req any, Res any] interface {
Recv() (*Req, error)
Send(*Res) error
ServerStream
}
// GenericClientStream implements the ServerStreamingClient, ClientStreamingClient,
// and BidiStreamingClient interfaces. It is used in generated code.
type GenericClientStream[Req any, Res any] struct {
ClientStream
}
var _ ServerStreamingClient[string] = (*GenericClientStream[int, string])(nil)
var _ ClientStreamingClient[int, string] = (*GenericClientStream[int, string])(nil)
var _ BidiStreamingClient[int, string] = (*GenericClientStream[int, string])(nil)
// Send pushes one message into the stream of requests to be consumed by the
// server. The type of message which can be sent is determined by the Req type
// parameter of the GenericClientStream receiver.
func (x *GenericClientStream[Req, Res]) Send(m *Req) error {
return x.ClientStream.SendMsg(m)
}
// Recv reads one message from the stream of responses generated by the server.
// The type of the message returned is determined by the Res type parameter
// of the GenericClientStream receiver.
func (x *GenericClientStream[Req, Res]) Recv() (*Res, error) {
m := new(Res)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// CloseAndRecv closes the sending side of the stream, then receives the unary
// response from the server. The type of message which it returns is determined
// by the Res type parameter of the GenericClientStream receiver.
func (x *GenericClientStream[Req, Res]) CloseAndRecv() (*Res, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(Res)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// GenericServerStream implements the ServerStreamingServer, ClientStreamingServer,
// and BidiStreamingServer interfaces. It is used in generated code.
type GenericServerStream[Req any, Res any] struct {
ServerStream
}
var _ ServerStreamingServer[string] = (*GenericServerStream[int, string])(nil)
var _ ClientStreamingServer[int, string] = (*GenericServerStream[int, string])(nil)
var _ BidiStreamingServer[int, string] = (*GenericServerStream[int, string])(nil)
// Send pushes one message into the stream of responses to be consumed by the
// client. The type of message which can be sent is determined by the Res
// type parameter of the serverStreamServer receiver.
func (x *GenericServerStream[Req, Res]) Send(m *Res) error {
return x.ServerStream.SendMsg(m)
}
// SendAndClose pushes the unary response to the client. The type of message
// which can be sent is determined by the Res type parameter of the
// clientStreamServer receiver.
func (x *GenericServerStream[Req, Res]) SendAndClose(m *Res) error {
return x.ServerStream.SendMsg(m)
}
// Recv reads one message from the stream of requests generated by the client.
// The type of the message returned is determined by the Req type parameter
// of the clientStreamServer receiver.
func (x *GenericServerStream[Req, Res]) Recv() (*Req, error) {
m := new(Req)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.63.2"
const Version = "1.64.0"

195
vendor/google.golang.org/grpc/vet.sh generated vendored
View File

@ -1,195 +0,0 @@
#!/bin/bash
set -ex # Exit on error; debugging enabled.
set -o pipefail # Fail a pipe if any sub-command fails.
# not makes sure the command passed to it does not exit with a return code of 0.
not() {
# This is required instead of the earlier (! $COMMAND) because subshells and
# pipefail don't work the same on Darwin as in Linux.
! "$@"
}
die() {
echo "$@" >&2
exit 1
}
fail_on_output() {
tee /dev/stderr | not read
}
# Check to make sure it's safe to modify the user's git repo.
git status --porcelain | fail_on_output
# Undo any edits made by this script.
cleanup() {
git reset --hard HEAD
}
trap cleanup EXIT
PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}"
go version
if [[ "$1" = "-install" ]]; then
# Install the pinned versions as defined in module tools.
pushd ./test/tools
go install \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell
popd
if [[ -z "${VET_SKIP_PROTO}" ]]; then
if [[ "${GITHUB_ACTIONS}" = "true" ]]; then
PROTOBUF_VERSION=25.2 # a.k.a. v4.22.0 in pb.go files.
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
pushd /home/runner/go
wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
unzip ${PROTOC_FILENAME}
bin/protoc --version
popd
elif not which protoc > /dev/null; then
die "Please install protoc into your path"
fi
fi
exit 0
elif [[ "$#" -ne 0 ]]; then
die "Unknown argument(s): $*"
fi
# - Check that generated proto files are up to date.
if [[ -z "${VET_SKIP_PROTO}" ]]; then
make proto && git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
fi
if [[ -n "${VET_ONLY_PROTO}" ]]; then
exit 0
fi
# - Ensure all source files contain a copyright message.
# (Done in two parts because Darwin "git grep" has broken support for compound
# exclusion matches.)
(grep -L "DO NOT EDIT" $(git grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)" -- '*.go') || true) | fail_on_output
# - Make sure all tests in grpc and grpc/test use leakcheck via Teardown.
not grep 'func Test[^(]' *_test.go
not grep 'func Test[^(]' test/*.go
# - Check for typos in test function names
git grep 'func (s) ' -- "*_test.go" | not grep -v 'func (s) Test'
git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Example'
# - Do not import x/net/context.
not git grep -l 'x/net/context' -- "*.go"
# - Do not use time.After except in tests. It has the potential to leak the
# timer since there is no way to stop it early.
git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|test_utils\|testutils'
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^interop/stress\|grpcrand\|^benchmark\|wrr_test'
# - Do not use "interface{}"; use "any" instead.
git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc\|grpc_testing_not_regenerate'
# - Do not call grpclog directly. Use grpclog.Component instead.
git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpclog.F' --or -e 'grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go'
# - Ensure all ptypes proto packages are renamed when importing.
not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"
# - Ensure all usages of grpc_testing package are renamed when importing.
not git grep "\(import \|^\s*\)\"google.golang.org/grpc/interop/grpc_testing" -- "*.go"
# - Ensure all xds proto imports are renamed to *pb or *grpc.
git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.pb.go' | not grep -v 'pb "\|grpc "'
misspell -error .
# - gofmt, goimports, go vet, go mod tidy.
# Perform these checks on each module inside gRPC.
for MOD_FILE in $(find . -name 'go.mod'); do
MOD_DIR=$(dirname ${MOD_FILE})
pushd ${MOD_DIR}
go vet -all ./... | fail_on_output
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
go mod tidy -compat=1.19
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1)
popd
done
# - Collection of static analysis checks
SC_OUT="$(mktemp)"
staticcheck -go 1.19 -checks 'all' ./... > "${SC_OUT}" || true
# Error for anything other than checks that need exclusions.
grep -v "(ST1000)" "${SC_OUT}" | grep -v "(SA1019)" | grep -v "(ST1003)" | not grep -v "(ST1019)\|\(other import of\)"
# Exclude underscore checks for generated code.
grep "(ST1003)" "${SC_OUT}" | not grep -v '\(.pb.go:\)\|\(code_string_test.go:\)\|\(grpc_testing_not_regenerate\)'
# Error for duplicate imports not including grpc protos.
grep "(ST1019)\|\(other import of\)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused
channelz/grpc_channelz_v1"
go-control-plane/envoy
grpclb/grpc_lb_v1"
health/grpc_health_v1"
interop/grpc_testing"
orca/v3"
proto/grpc_gcp"
proto/grpc_lookup_v1"
reflection/grpc_reflection_v1"
reflection/grpc_reflection_v1alpha"
XXXXX PleaseIgnoreUnused'
# Error for any package comments not in generated code.
grep "(ST1000)" "${SC_OUT}" | not grep -v "\.pb\.go:"
# Only ignore the following deprecated types/fields/functions and exclude
# generated code.
grep "(SA1019)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused
XXXXX Protobuf related deprecation errors:
"github.com/golang/protobuf
.pb.go:
grpc_testing_not_regenerate
: ptypes.
proto.RegisterType
XXXXX gRPC internal usage deprecation errors:
"google.golang.org/grpc
: grpc.
: v1alpha.
: v1alphareflectionpb.
BalancerAttributes is deprecated:
CredsBundle is deprecated:
Metadata is deprecated: use Attributes instead.
NewSubConn is deprecated:
OverrideServerName is deprecated:
RemoveSubConn is deprecated:
SecurityVersion is deprecated:
Target is deprecated: Use the Target field in the BuildOptions instead.
UpdateAddresses is deprecated:
UpdateSubConnState is deprecated:
balancer.ErrTransientFailure is deprecated:
grpc/reflection/v1alpha/reflection.proto
SwitchTo is deprecated:
XXXXX xDS deprecated fields we support
.ExactMatch
.PrefixMatch
.SafeRegexMatch
.SuffixMatch
GetContainsMatch
GetExactMatch
GetMatchSubjectAltNames
GetPrefixMatch
GetSafeRegexMatch
GetSuffixMatch
GetTlsCertificateCertificateProviderInstance
GetValidationContextCertificateProviderInstance
XXXXX PleaseIgnoreUnused'
echo SUCCESS

11
vendor/modules.txt vendored
View File

@ -574,7 +574,7 @@ golang.org/x/net/internal/timeseries
golang.org/x/net/proxy
golang.org/x/net/trace
golang.org/x/net/websocket
# golang.org/x/oauth2 v0.17.0
# golang.org/x/oauth2 v0.18.0
## explicit; go 1.18
golang.org/x/oauth2
golang.org/x/oauth2/internal
@ -613,15 +613,15 @@ google.golang.org/appengine/internal/log
google.golang.org/appengine/internal/remote_api
google.golang.org/appengine/internal/urlfetch
google.golang.org/appengine/urlfetch
# google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de
# google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237
## explicit; go 1.19
google.golang.org/genproto/googleapis/api/httpbody
# google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be
## explicit; go 1.19
# google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5
## explicit; go 1.20
google.golang.org/genproto/googleapis/rpc/code
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.63.2
# google.golang.org/grpc v1.64.0
## explicit; go 1.19
google.golang.org/grpc
google.golang.org/grpc/attributes
@ -674,6 +674,7 @@ google.golang.org/grpc/peer
google.golang.org/grpc/reflection
google.golang.org/grpc/reflection/grpc_reflection_v1
google.golang.org/grpc/reflection/grpc_reflection_v1alpha
google.golang.org/grpc/reflection/internal
google.golang.org/grpc/resolver
google.golang.org/grpc/resolver/dns
google.golang.org/grpc/serviceconfig