fix dial error when clean up a dead shim

Signed-off-by: lifubang <lifubang@acmcoder.com>
This commit is contained in:
lifubang 2020-03-12 08:24:29 +08:00
parent c6851ace61
commit 488d6194f2
3 changed files with 44 additions and 18 deletions

View File

@ -44,8 +44,11 @@ func TestClientTTRPC_Reconnect(t *testing.T) {
err = client.Reconnect()
assert.NilError(t, err)
service, err := client.EventsService()
assert.NilError(t, err)
// Send test request to make sure its alive after reconnect
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{
_, err = service.Forward(context.Background(), &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: namespaces.Default,
@ -63,10 +66,13 @@ func TestClientTTRPC_Close(t *testing.T) {
client, err := ttrpcutil.NewClient(address + ".ttrpc")
assert.NilError(t, err)
service, err := client.EventsService()
assert.NilError(t, err)
err = client.Close()
assert.NilError(t, err)
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}})
_, err = service.Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}})
assert.Equal(t, err, ttrpc.ErrClosed)
err = client.Close()

View File

@ -50,14 +50,8 @@ func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
return client, nil
}
client, err := connector()
if err != nil {
return nil, err
}
return &Client{
connector: connector,
client: client,
}, nil
}
@ -74,6 +68,12 @@ func (c *Client) Reconnect() error {
return errors.New("client is closed")
}
if c.client != nil {
if err := c.client.Close(); err != nil {
return err
}
}
client, err := c.connector()
if err != nil {
return err
@ -84,16 +84,26 @@ func (c *Client) Reconnect() error {
}
// EventsService creates an EventsService client
func (c *Client) EventsService() v1.EventsService {
return v1.NewEventsClient(c.Client())
func (c *Client) EventsService() (v1.EventsService, error) {
client, err := c.Client()
if err != nil {
return nil, err
}
return v1.NewEventsClient(client), nil
}
// Client returns the underlying TTRPC client object
func (c *Client) Client() *ttrpc.Client {
func (c *Client) Client() (*ttrpc.Client, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.client
if c.client == nil {
client, err := c.connector()
if err != nil {
return nil, err
}
c.client = client
}
return c.client, nil
}
// Close closes the clients TTRPC connection to containerd
@ -102,5 +112,8 @@ func (c *Client) Close() error {
defer c.mu.Unlock()
c.closed = true
if c.client != nil {
return c.client.Close()
}
return nil
}

View File

@ -128,21 +128,28 @@ func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event
}
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
_, err := l.client.EventsService().Forward(ctx, req)
service, err := l.client.EventsService()
if err == nil {
_, err = service.Forward(ctx, req)
if err == nil {
return nil
}
}
if err != ttrpc.ErrClosed {
return err
}
// Reconnect and retry request
if err := l.client.Reconnect(); err != nil {
if err = l.client.Reconnect(); err != nil {
return err
}
if _, err := l.client.EventsService().Forward(ctx, req); err != nil {
service, err = l.client.EventsService()
if err != nil {
return err
}
if _, err = service.Forward(ctx, req); err != nil {
return err
}