client: add UserOnCloseWait function
ttrpc provides WithOnClose option for user and ttrpc will call the
callback function when connection is closed unexpectedly or the ttrpc
client's Close() method is called. containerd runtime plugin uses it
to handle cleanup the resources created by containerd shim.
But the ttrpc client's Close() is only trigger and the shim's cleanup
resource callback is called asynchronously, which might make part of
resources leaky. There is an example from containerd-runtime-v2 for
runc:
```happy
[Task.Delete goroutine] [cleanupCallback goroutine]
call ttrpc client.Close() -->
read bundle and call runc delete
delete bundle
```
If the cleanupCallback is called after deleting bundle, the callback
will fail to call runc delete. If there is any running processes, the
resource becomes leaky.
```unhappy
[Task.Delete goroutine] [cleanupCallback goroutine]
call ttrpc client.Close() -->
delete bundle
failed to read bundle and call runc delete
```
In order to avoid this, introduces the UserOnCloseWait to make sure that
the cleanupCallback has been called synchronously, like:
```
[Task.Delete goroutine] [cleanupCallback goroutine]
call ttrpc client.Close() -->
wait for callback
read bundle and call runc delete
<-- finish sync
delete bundle
```
Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
34
client.go
34
client.go
@@ -47,8 +47,9 @@ type Client struct {
|
||||
ctx context.Context
|
||||
closed func()
|
||||
|
||||
closeOnce sync.Once
|
||||
userCloseFunc func()
|
||||
closeOnce sync.Once
|
||||
userCloseFunc func()
|
||||
userCloseWaitCh chan struct{}
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
@@ -75,14 +76,15 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
|
||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &Client{
|
||||
codec: codec{},
|
||||
conn: conn,
|
||||
channel: newChannel(conn),
|
||||
calls: make(chan *callRequest),
|
||||
closed: cancel,
|
||||
ctx: ctx,
|
||||
userCloseFunc: func() {},
|
||||
interceptor: defaultClientInterceptor,
|
||||
codec: codec{},
|
||||
conn: conn,
|
||||
channel: newChannel(conn),
|
||||
calls: make(chan *callRequest),
|
||||
closed: cancel,
|
||||
ctx: ctx,
|
||||
userCloseFunc: func() {},
|
||||
userCloseWaitCh: make(chan struct{}),
|
||||
interceptor: defaultClientInterceptor,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@@ -175,6 +177,17 @@ func (c *Client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UserOnCloseWait is used to blocks untils the user's on-close callback
|
||||
// finishes.
|
||||
func (c *Client) UserOnCloseWait(ctx context.Context) error {
|
||||
select {
|
||||
case <-c.userCloseWaitCh:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
type message struct {
|
||||
messageHeader
|
||||
p []byte
|
||||
@@ -251,6 +264,7 @@ func (c *Client) run() {
|
||||
defer func() {
|
||||
c.conn.Close()
|
||||
c.userCloseFunc()
|
||||
close(c.userCloseWaitCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
|
||||
70
client_test.go
Normal file
70
client_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestUserOnCloseWait(t *testing.T) {
|
||||
var (
|
||||
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
|
||||
server = mustServer(t)(NewServer())
|
||||
testImpl = &testingServer{}
|
||||
addr, listener = newTestListener(t)
|
||||
)
|
||||
|
||||
defer cancel()
|
||||
defer listener.Close()
|
||||
|
||||
registerTestingService(server, testImpl)
|
||||
|
||||
go server.Serve(ctx, listener)
|
||||
defer server.Shutdown(ctx)
|
||||
|
||||
var (
|
||||
dataCh = make(chan string)
|
||||
client, cleanup = newTestClient(t, addr,
|
||||
WithOnClose(func() {
|
||||
dataCh <- time.Now().String()
|
||||
}),
|
||||
)
|
||||
|
||||
tp testPayload
|
||||
tclient = newTestingClient(client)
|
||||
)
|
||||
|
||||
if _, err := tclient.Test(ctx, &tp); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cleanup()
|
||||
|
||||
fctx, fcancel := context.WithDeadline(ctx, time.Now().Add(1*time.Second))
|
||||
defer fcancel()
|
||||
if err := client.UserOnCloseWait(fctx); err == nil || err != context.DeadlineExceeded {
|
||||
t.Fatalf("expected error %v, but got %v", context.DeadlineExceeded, err)
|
||||
}
|
||||
|
||||
_ = <-dataCh
|
||||
|
||||
if err := client.UserOnCloseWait(ctx); err != nil {
|
||||
t.Fatalf("expected error nil , but got %v", err)
|
||||
}
|
||||
}
|
||||
@@ -607,12 +607,12 @@ func roundTrip(ctx context.Context, t *testing.T, client *testingClient, value s
|
||||
}
|
||||
}
|
||||
|
||||
func newTestClient(t testing.TB, addr string) (*Client, func()) {
|
||||
func newTestClient(t testing.TB, addr string, opts ...ClientOpts) (*Client, func()) {
|
||||
conn, err := net.Dial("unix", addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
client := NewClient(conn)
|
||||
client := NewClient(conn, opts...)
|
||||
return client, func() {
|
||||
conn.Close()
|
||||
client.Close()
|
||||
|
||||
Reference in New Issue
Block a user