Update vendor

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2024-02-05 11:48:16 -08:00
parent 047d42e901
commit da1673f55d
4 changed files with 84 additions and 7 deletions

View File

@ -71,6 +71,42 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
} }
} }
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
return func(c *Client) {
if len(interceptors) == 0 {
return
}
if c.interceptor != nil {
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
req *Request,
reply *Response,
info *UnaryClientInfo,
final Invoker,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
}
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
if len(interceptors) == 0 {
return final
}
return func(
ctx context.Context,
req *Request,
reply *Response,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
// NewClient creates a new ttrpc client using the given connection // NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client { func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -85,13 +121,16 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx: ctx, ctx: ctx,
userCloseFunc: func() {}, userCloseFunc: func() {},
userCloseWaitCh: make(chan struct{}), userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
} }
for _, o := range opts { for _, o := range opts {
o(c) o(c)
} }
if c.interceptor == nil {
c.interceptor = defaultClientInterceptor
}
go c.run() go c.run()
return c return c
} }
@ -286,7 +325,7 @@ func (c *Client) Close() error {
return nil return nil
} }
// UserOnCloseWait is used to blocks untils the user's on-close callback // UserOnCloseWait is used to block until the user's on-close callback
// finishes. // finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error { func (c *Client) UserOnCloseWait(ctx context.Context) error {
select { select {

View File

@ -16,7 +16,10 @@
package ttrpc package ttrpc
import "errors" import (
"context"
"errors"
)
type serverConfig struct { type serverConfig struct {
handshaker Handshaker handshaker Handshaker
@ -44,9 +47,40 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt { func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error { return func(c *serverConfig) error {
if c.interceptor != nil { if c.interceptor != nil {
return errors.New("only one interceptor allowed per server") return errors.New("only one unchained interceptor allowed per server")
} }
c.interceptor = i c.interceptor = i
return nil return nil
} }
} }
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if len(interceptors) == 0 {
return nil
}
if c.interceptor != nil {
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
unmarshal Unmarshaler,
info *UnaryServerInfo,
method Method) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
return nil
}
}
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
if len(interceptors) == 0 {
return method
}
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
}

View File

@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true) respond(st, p, stream.StreamingServer, true)
}() }()
if req.Payload != nil { // Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
unmarshal := func(obj interface{}) error { unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj) return protoUnmarshal(req.Payload, obj)
} }

4
vendor/modules.txt vendored
View File

@ -150,8 +150,8 @@ github.com/containerd/platforms
github.com/containerd/plugin github.com/containerd/plugin
github.com/containerd/plugin/dynamic github.com/containerd/plugin/dynamic
github.com/containerd/plugin/registry github.com/containerd/plugin/registry
# github.com/containerd/ttrpc v1.2.2 # github.com/containerd/ttrpc v1.2.3
## explicit; go 1.13 ## explicit; go 1.19
github.com/containerd/ttrpc github.com/containerd/ttrpc
# github.com/containerd/typeurl/v2 v2.1.1 # github.com/containerd/typeurl/v2 v2.1.1
## explicit; go 1.13 ## explicit; go 1.13