diff --git a/client_ttrpc_test.go b/client_ttrpc_test.go index 85b7aa383..41715d7f6 100644 --- a/client_ttrpc_test.go +++ b/client_ttrpc_test.go @@ -44,8 +44,11 @@ func TestClientTTRPC_Reconnect(t *testing.T) { err = client.Reconnect() assert.NilError(t, err) + service, err := client.EventsService() + assert.NilError(t, err) + // Send test request to make sure its alive after reconnect - _, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{ + _, err = service.Forward(context.Background(), &v1.ForwardRequest{ Envelope: &v1.Envelope{ Timestamp: time.Now(), Namespace: namespaces.Default, @@ -63,10 +66,13 @@ func TestClientTTRPC_Close(t *testing.T) { client, err := ttrpcutil.NewClient(address + ".ttrpc") assert.NilError(t, err) + service, err := client.EventsService() + assert.NilError(t, err) + err = client.Close() assert.NilError(t, err) - _, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}}) + _, err = service.Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}}) assert.Equal(t, err, ttrpc.ErrClosed) err = client.Close() diff --git a/pkg/ttrpcutil/client.go b/pkg/ttrpcutil/client.go index 5edb19dc3..8b4d925d2 100644 --- a/pkg/ttrpcutil/client.go +++ b/pkg/ttrpcutil/client.go @@ -50,14 +50,8 @@ func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) { return client, nil } - client, err := connector() - if err != nil { - return nil, err - } - return &Client{ connector: connector, - client: client, }, nil } @@ -74,6 +68,12 @@ func (c *Client) Reconnect() error { return errors.New("client is closed") } + if c.client != nil { + if err := c.client.Close(); err != nil { + return err + } + } + client, err := c.connector() if err != nil { return err @@ -84,16 +84,26 @@ func (c *Client) Reconnect() error { } // EventsService creates an EventsService client -func (c *Client) EventsService() v1.EventsService { - return v1.NewEventsClient(c.Client()) +func (c *Client) EventsService() (v1.EventsService, error) { + client, err := c.Client() + if err != nil { + return nil, err + } + return v1.NewEventsClient(client), nil } // Client returns the underlying TTRPC client object -func (c *Client) Client() *ttrpc.Client { +func (c *Client) Client() (*ttrpc.Client, error) { c.mu.Lock() defer c.mu.Unlock() - - return c.client + if c.client == nil { + client, err := c.connector() + if err != nil { + return nil, err + } + c.client = client + } + return c.client, nil } // Close closes the clients TTRPC connection to containerd @@ -102,5 +112,8 @@ func (c *Client) Close() error { defer c.mu.Unlock() c.closed = true - return c.client.Close() + if c.client != nil { + return c.client.Close() + } + return nil } diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go index 3dbd0e045..c5c4ecc15 100644 --- a/runtime/v2/shim/publisher.go +++ b/runtime/v2/shim/publisher.go @@ -128,9 +128,12 @@ func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event } func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error { - _, err := l.client.EventsService().Forward(ctx, req) + service, err := l.client.EventsService() if err == nil { - return nil + _, err = service.Forward(ctx, req) + if err == nil { + return nil + } } if err != ttrpc.ErrClosed { @@ -138,11 +141,15 @@ func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.Forw } // Reconnect and retry request - if err := l.client.Reconnect(); err != nil { + if err = l.client.Reconnect(); err != nil { return err } - if _, err := l.client.EventsService().Forward(ctx, req); err != nil { + service, err = l.client.EventsService() + if err != nil { + return err + } + if _, err = service.Forward(ctx, req); err != nil { return err }