Move ttrpc client to pkg/ttrpcutil

Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
This commit is contained in:
Maksym Pavlenko 2019-05-20 16:44:49 -07:00
parent 7b06c9a1ce
commit 7f79fbb245
5 changed files with 22 additions and 22 deletions

View File

@ -23,13 +23,14 @@ import (
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/ttrpcutil"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
"gotest.tools/assert" "gotest.tools/assert"
) )
func TestClientTTRPC_New(t *testing.T) { func TestClientTTRPC_New(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc") client, err := ttrpcutil.NewClient(address + ".ttrpc")
assert.NilError(t, err) assert.NilError(t, err)
err = client.Close() err = client.Close()
@ -37,7 +38,7 @@ func TestClientTTRPC_New(t *testing.T) {
} }
func TestClientTTRPC_Reconnect(t *testing.T) { func TestClientTTRPC_Reconnect(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc") client, err := ttrpcutil.NewClient(address + ".ttrpc")
assert.NilError(t, err) assert.NilError(t, err)
err = client.Reconnect() err = client.Reconnect()
@ -59,7 +60,7 @@ func TestClientTTRPC_Reconnect(t *testing.T) {
} }
func TestClientTTRPC_Close(t *testing.T) { func TestClientTTRPC_Close(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc") client, err := ttrpcutil.NewClient(address + ".ttrpc")
assert.NilError(t, err) assert.NilError(t, err)
err = client.Close() err = client.Close()

View File

@ -14,7 +14,7 @@
limitations under the License. limitations under the License.
*/ */
package containerd package ttrpcutil
import ( import (
"sync" "sync"
@ -29,16 +29,16 @@ const ttrpcDialTimeout = 5 * time.Second
type ttrpcConnector func() (*ttrpc.Client, error) type ttrpcConnector func() (*ttrpc.Client, error)
// ClientTTRPC is the client to interact with TTRPC part of containerd server (plugins, events) // Client is the client to interact with TTRPC part of containerd server (plugins, events)
type ClientTTRPC struct { type Client struct {
mu sync.Mutex mu sync.Mutex
connector ttrpcConnector connector ttrpcConnector
client *ttrpc.Client client *ttrpc.Client
closed bool closed bool
} }
// NewTTRPC returns a new containerd TTRPC client that is connected to the containerd instance provided by address // NewClient returns a new containerd TTRPC client that is connected to the containerd instance provided by address
func NewTTRPC(address string, opts ...ttrpc.ClientOpts) (*ClientTTRPC, error) { func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
connector := func() (*ttrpc.Client, error) { connector := func() (*ttrpc.Client, error) {
conn, err := ttrpcDial(address, ttrpcDialTimeout) conn, err := ttrpcDial(address, ttrpcDialTimeout)
if err != nil { if err != nil {
@ -54,14 +54,14 @@ func NewTTRPC(address string, opts ...ttrpc.ClientOpts) (*ClientTTRPC, error) {
return nil, err return nil, err
} }
return &ClientTTRPC{ return &Client{
connector: connector, connector: connector,
client: client, client: client,
}, nil }, nil
} }
// Reconnect re-establishes the TTRPC connection to the containerd daemon // Reconnect re-establishes the TTRPC connection to the containerd daemon
func (c *ClientTTRPC) Reconnect() error { func (c *Client) Reconnect() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -83,12 +83,12 @@ func (c *ClientTTRPC) Reconnect() error {
} }
// EventsService creates an EventsService client // EventsService creates an EventsService client
func (c *ClientTTRPC) EventsService() v1.EventsService { func (c *Client) EventsService() v1.EventsService {
return v1.NewEventsClient(c.Client()) return v1.NewEventsClient(c.Client())
} }
// Client returns the underlying TTRPC client object // Client returns the underlying TTRPC client object
func (c *ClientTTRPC) Client() *ttrpc.Client { func (c *Client) Client() *ttrpc.Client {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -96,7 +96,7 @@ func (c *ClientTTRPC) Client() *ttrpc.Client {
} }
// Close closes the clients TTRPC connection to containerd // Close closes the clients TTRPC connection to containerd
func (c *ClientTTRPC) Close() error { func (c *Client) Close() error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View File

@ -16,7 +16,7 @@
limitations under the License. limitations under the License.
*/ */
package containerd package ttrpcutil
import ( import (
"net" "net"

View File

@ -16,7 +16,7 @@
limitations under the License. limitations under the License.
*/ */
package containerd package ttrpcutil
import ( import (
"net" "net"

View File

@ -21,14 +21,13 @@ import (
"sync" "sync"
"time" "time"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/ttrpcutil"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
) )
const ( const (
@ -43,7 +42,7 @@ type item struct {
} }
func newPublisher(address string) (*remoteEventsPublisher, error) { func newPublisher(address string) (*remoteEventsPublisher, error) {
client, err := containerd.NewTTRPC(address) client, err := ttrpcutil.NewClient(address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -59,7 +58,7 @@ func newPublisher(address string) (*remoteEventsPublisher, error) {
} }
type remoteEventsPublisher struct { type remoteEventsPublisher struct {
client *containerd.ClientTTRPC client *ttrpcutil.Client
closed chan struct{} closed chan struct{}
closer sync.Once closer sync.Once
requeue chan *item requeue chan *item