Add client.Reconnect API
This adds a reconnect api to the client so that the client instance stays the same and on reconnect, all tasks and containers with references to the *Client have the correct connection. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
255ad41cfc
commit
7b653dc9ed
38
client.go
38
client.go
@ -93,11 +93,22 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
|
||||
grpc.WithStreamInterceptor(stream),
|
||||
)
|
||||
}
|
||||
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||
connector := func() (*grpc.ClientConn, error) {
|
||||
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
return NewWithConn(conn, opts...)
|
||||
conn, err := connector()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client{
|
||||
conn: conn,
|
||||
connector: connector,
|
||||
runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewWithConn returns a new containerd client that is connected to the containerd
|
||||
@ -112,8 +123,23 @@ func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
|
||||
// Client is the client to interact with containerd and its various services
|
||||
// using a uniform interface
|
||||
type Client struct {
|
||||
conn *grpc.ClientConn
|
||||
runtime string
|
||||
conn *grpc.ClientConn
|
||||
runtime string
|
||||
connector func() (*grpc.ClientConn, error)
|
||||
}
|
||||
|
||||
// Reconnect re-establishes the GRPC connection to the containerd daemon
|
||||
func (c *Client) Reconnect() error {
|
||||
if c.connector == nil {
|
||||
return errors.New("unable to reconnect to containerd, no connector available")
|
||||
}
|
||||
c.conn.Close()
|
||||
conn, err := c.connector()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsServing returns true if the client can successfully connect to the
|
||||
|
@ -191,3 +191,37 @@ func TestImagePull(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientReconnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := testContext()
|
||||
defer cancel()
|
||||
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if client == nil {
|
||||
t.Fatal("New() returned nil client")
|
||||
}
|
||||
ok, err := client.IsServing(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("containerd is not serving")
|
||||
}
|
||||
if err := client.Reconnect(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if ok, err = client.IsServing(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("containerd is not serving")
|
||||
}
|
||||
if err := client.Close(); err != nil {
|
||||
t.Errorf("client closed returned errror %v", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user