Add TTRPC client

Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
This commit is contained in:
Maksym Pavlenko 2019-05-13 21:05:07 -07:00
parent 57fbb16234
commit 7b06c9a1ce
9 changed files with 314 additions and 163 deletions

105
client_ttrpc.go Normal file
View File

@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"sync"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
)
const ttrpcDialTimeout = 5 * time.Second
type ttrpcConnector func() (*ttrpc.Client, error)
// ClientTTRPC is the client to interact with TTRPC part of containerd server (plugins, events)
type ClientTTRPC struct {
mu sync.Mutex
connector ttrpcConnector
client *ttrpc.Client
closed bool
}
// NewTTRPC returns a new containerd TTRPC client that is connected to the containerd instance provided by address
func NewTTRPC(address string, opts ...ttrpc.ClientOpts) (*ClientTTRPC, error) {
connector := func() (*ttrpc.Client, error) {
conn, err := ttrpcDial(address, ttrpcDialTimeout)
if err != nil {
return nil, errors.Wrap(err, "failed to connect")
}
client := ttrpc.NewClient(conn, opts...)
return client, nil
}
client, err := connector()
if err != nil {
return nil, err
}
return &ClientTTRPC{
connector: connector,
client: client,
}, nil
}
// Reconnect re-establishes the TTRPC connection to the containerd daemon
func (c *ClientTTRPC) Reconnect() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.connector == nil {
return errors.New("unable to reconnect to containerd, no connector available")
}
if c.closed {
return errors.New("client is closed")
}
client, err := c.connector()
if err != nil {
return err
}
c.client = client
return nil
}
// EventsService creates an EventsService client
func (c *ClientTTRPC) EventsService() v1.EventsService {
return v1.NewEventsClient(c.Client())
}
// Client returns the underlying TTRPC client object
func (c *ClientTTRPC) Client() *ttrpc.Client {
c.mu.Lock()
defer c.mu.Unlock()
return c.client
}
// Close closes the clients TTRPC connection to containerd
func (c *ClientTTRPC) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
return c.client.Close()
}

73
client_ttrpc_test.go Normal file
View File

@ -0,0 +1,73 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"context"
"testing"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/ttrpc"
"github.com/gogo/protobuf/types"
"gotest.tools/assert"
)
func TestClientTTRPC_New(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc")
assert.NilError(t, err)
err = client.Close()
assert.NilError(t, err)
}
func TestClientTTRPC_Reconnect(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc")
assert.NilError(t, err)
err = client.Reconnect()
assert.NilError(t, err)
// Send test request to make sure its alive after reconnect
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: namespaces.Default,
Topic: "/test",
Event: &types.Any{},
},
})
assert.NilError(t, err)
err = client.Close()
assert.NilError(t, err)
}
func TestClientTTRPC_Close(t *testing.T) {
client, err := NewTTRPC(address + ".ttrpc")
assert.NilError(t, err)
err = client.Close()
assert.NilError(t, err)
_, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}})
assert.Equal(t, err, ttrpc.ErrClosed)
err = client.Close()
assert.NilError(t, err)
}

30
client_ttrpc_unix.go Normal file
View File

@ -0,0 +1,30 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"net"
"strings"
"time"
)
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}

60
client_ttrpc_windows.go Normal file
View File

@ -0,0 +1,60 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"net"
"os"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/pkg/errors"
)
func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Since(start)
if remaining <= 0 {
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
if lastError == nil {
break
}
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Since(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
return c, lastError
}

View File

@ -1,87 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"net"
"sync"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
)
type dialConnect func() (net.Conn, error)
var errDialerClosed = errors.New("events dialer is closed")
func newDialier(newFn dialConnect) *dialer {
return &dialer{
newFn: newFn,
}
}
type dialer struct {
mu sync.Mutex
newFn dialConnect
service v1.EventsService
conn net.Conn
closed bool
}
func (d *dialer) Get() (v1.EventsService, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return nil, errDialerClosed
}
if d.service == nil {
conn, err := d.newFn()
if err != nil {
return nil, err
}
d.conn = conn
d.service = v1.NewEventsClient(ttrpc.NewClient(conn))
}
return d.service, nil
}
func (d *dialer) Put(err error) {
if err != nil {
d.mu.Lock()
d.conn.Close()
d.service = nil
d.mu.Unlock()
}
}
func (d *dialer) Close() (err error) {
d.mu.Lock()
if d.closed {
return errDialerClosed
}
if d.conn != nil {
err = d.conn.Close()
}
d.service = nil
d.closed = true
d.mu.Unlock()
return err
}

View File

@ -18,15 +18,17 @@ package shim
import (
"context"
"net"
"sync"
"time"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
)
const (
@ -40,20 +42,24 @@ type item struct {
count int
}
func newPublisher(address string) *remoteEventsPublisher {
func newPublisher(address string) (*remoteEventsPublisher, error) {
client, err := containerd.NewTTRPC(address)
if err != nil {
return nil, err
}
l := &remoteEventsPublisher{
dialer: newDialier(func() (net.Conn, error) {
return connect(address, dial)
}),
client: client,
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
}
go l.processQueue()
return l
return l, nil
}
type remoteEventsPublisher struct {
dialer *dialer
client *containerd.ClientTTRPC
closed chan struct{}
closer sync.Once
requeue chan *item
@ -64,7 +70,7 @@ func (l *remoteEventsPublisher) Done() <-chan struct{} {
}
func (l *remoteEventsPublisher) Close() (err error) {
err = l.dialer.Close()
err = l.client.Close()
l.closer.Do(func() {
close(l.closed)
})
@ -79,18 +85,7 @@ func (l *remoteEventsPublisher) processQueue() {
continue
}
client, err := l.dialer.Get()
if err != nil {
l.dialer.Put(err)
l.queue(i)
logrus.WithError(err).Error("get events client")
continue
}
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil {
l.dialer.Put(err)
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
logrus.WithError(err).Error("forward event")
l.queue(i)
}
@ -124,22 +119,33 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
},
ctx: ctx,
}
client, err := l.dialer.Get()
if err != nil {
l.dialer.Put(err)
l.queue(i)
return err
}
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil {
l.dialer.Put(err)
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
l.queue(i)
return err
}
return nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 5*time.Second)
func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
_, err := l.client.EventsService().Forward(ctx, req)
if err == nil {
return nil
}
if err != ttrpc.ErrClosed {
return err
}
// Reconnect and retry request
if err := l.client.Reconnect(); err != nil {
return err
}
if _, err := l.client.EventsService().Forward(ctx, req); err != nil {
return err
}
return nil
}

View File

@ -162,9 +162,13 @@ func run(id string, initFunc Init, config Config) error {
return err
}
}
address := fmt.Sprintf("%s.ttrpc", addressFlag)
publisher := newPublisher(address)
address := fmt.Sprintf("%s.ttrpc", addressFlag)
publisher, err := newPublisher(address)
if err != nil {
return err
}
defer publisher.Close()
if namespaceFlag == "" {

View File

@ -24,9 +24,7 @@ import (
"net"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/containerd/fifo"
"github.com/pkg/errors"
@ -93,8 +91,3 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
}
func dial(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}

View File

@ -26,7 +26,6 @@ import (
"net"
"os"
"sync"
"time"
"unsafe"
winio "github.com/Microsoft/go-winio"
@ -286,35 +285,3 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
go dswl.beginAccept()
return dswl, nil
}
func dial(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Since(start)
if remaining <= 0 {
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
if lastError == nil {
break
}
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Since(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
return c, lastError
}