From 7b06c9a1ce2b2d01bf3c6b13f8416db0ec4b0eca Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Mon, 13 May 2019 21:05:07 -0700 Subject: [PATCH] Add TTRPC client Signed-off-by: Maksym Pavlenko --- client_ttrpc.go | 105 ++++++++++++++++++++++++++++++++ client_ttrpc_test.go | 73 ++++++++++++++++++++++ client_ttrpc_unix.go | 30 +++++++++ client_ttrpc_windows.go | 60 ++++++++++++++++++ runtime/v2/shim/dialer.go | 87 -------------------------- runtime/v2/shim/publisher.go | 74 +++++++++++----------- runtime/v2/shim/shim.go | 8 ++- runtime/v2/shim/shim_unix.go | 7 --- runtime/v2/shim/shim_windows.go | 33 ---------- 9 files changed, 314 insertions(+), 163 deletions(-) create mode 100644 client_ttrpc.go create mode 100644 client_ttrpc_test.go create mode 100644 client_ttrpc_unix.go create mode 100644 client_ttrpc_windows.go delete mode 100644 runtime/v2/shim/dialer.go diff --git a/client_ttrpc.go b/client_ttrpc.go new file mode 100644 index 000000000..1dc249065 --- /dev/null +++ b/client_ttrpc.go @@ -0,0 +1,105 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "sync" + "time" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/ttrpc" + "github.com/pkg/errors" +) + +const ttrpcDialTimeout = 5 * time.Second + +type ttrpcConnector func() (*ttrpc.Client, error) + +// ClientTTRPC is the client to interact with TTRPC part of containerd server (plugins, events) +type ClientTTRPC struct { + mu sync.Mutex + connector ttrpcConnector + client *ttrpc.Client + closed bool +} + +// NewTTRPC returns a new containerd TTRPC client that is connected to the containerd instance provided by address +func NewTTRPC(address string, opts ...ttrpc.ClientOpts) (*ClientTTRPC, error) { + connector := func() (*ttrpc.Client, error) { + conn, err := ttrpcDial(address, ttrpcDialTimeout) + if err != nil { + return nil, errors.Wrap(err, "failed to connect") + } + + client := ttrpc.NewClient(conn, opts...) + return client, nil + } + + client, err := connector() + if err != nil { + return nil, err + } + + return &ClientTTRPC{ + connector: connector, + client: client, + }, nil +} + +// Reconnect re-establishes the TTRPC connection to the containerd daemon +func (c *ClientTTRPC) Reconnect() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.connector == nil { + return errors.New("unable to reconnect to containerd, no connector available") + } + + if c.closed { + return errors.New("client is closed") + } + + client, err := c.connector() + if err != nil { + return err + } + + c.client = client + return nil +} + +// EventsService creates an EventsService client +func (c *ClientTTRPC) EventsService() v1.EventsService { + return v1.NewEventsClient(c.Client()) +} + +// Client returns the underlying TTRPC client object +func (c *ClientTTRPC) Client() *ttrpc.Client { + c.mu.Lock() + defer c.mu.Unlock() + + return c.client +} + +// Close closes the clients TTRPC connection to containerd +func (c *ClientTTRPC) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + c.closed = true + return c.client.Close() +} diff --git a/client_ttrpc_test.go b/client_ttrpc_test.go new file mode 100644 index 000000000..ee75553dc --- /dev/null +++ b/client_ttrpc_test.go @@ -0,0 +1,73 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "context" + "testing" + "time" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/ttrpc" + "github.com/gogo/protobuf/types" + "gotest.tools/assert" +) + +func TestClientTTRPC_New(t *testing.T) { + client, err := NewTTRPC(address + ".ttrpc") + assert.NilError(t, err) + + err = client.Close() + assert.NilError(t, err) +} + +func TestClientTTRPC_Reconnect(t *testing.T) { + client, err := NewTTRPC(address + ".ttrpc") + assert.NilError(t, err) + + err = client.Reconnect() + assert.NilError(t, err) + + // Send test request to make sure its alive after reconnect + _, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{ + Envelope: &v1.Envelope{ + Timestamp: time.Now(), + Namespace: namespaces.Default, + Topic: "/test", + Event: &types.Any{}, + }, + }) + assert.NilError(t, err) + + err = client.Close() + assert.NilError(t, err) +} + +func TestClientTTRPC_Close(t *testing.T) { + client, err := NewTTRPC(address + ".ttrpc") + assert.NilError(t, err) + + err = client.Close() + assert.NilError(t, err) + + _, err = client.EventsService().Forward(context.Background(), &v1.ForwardRequest{Envelope: &v1.Envelope{}}) + assert.Equal(t, err, ttrpc.ErrClosed) + + err = client.Close() + assert.NilError(t, err) +} diff --git a/client_ttrpc_unix.go b/client_ttrpc_unix.go new file mode 100644 index 000000000..b3f0ba3b4 --- /dev/null +++ b/client_ttrpc_unix.go @@ -0,0 +1,30 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "net" + "strings" + "time" +) + +func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) +} diff --git a/client_ttrpc_windows.go b/client_ttrpc_windows.go new file mode 100644 index 000000000..b6ef39ba3 --- /dev/null +++ b/client_ttrpc_windows.go @@ -0,0 +1,60 @@ +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "net" + "os" + "time" + + winio "github.com/Microsoft/go-winio" + "github.com/pkg/errors" +) + +func ttrpcDial(address string, timeout time.Duration) (net.Conn, error) { + var c net.Conn + var lastError error + timedOutError := errors.Errorf("timed out waiting for npipe %s", address) + start := time.Now() + for { + remaining := timeout - time.Since(start) + if remaining <= 0 { + lastError = timedOutError + break + } + c, lastError = winio.DialPipe(address, &remaining) + if lastError == nil { + break + } + if !os.IsNotExist(lastError) { + break + } + // There is nobody serving the pipe. We limit the timeout for this case + // to 5 seconds because any shim that would serve this endpoint should + // serve it within 5 seconds. We use the passed in timeout for the + // `DialPipe` timeout if the pipe exists however to give the pipe time + // to `Accept` the connection. + if time.Since(start) >= 5*time.Second { + lastError = timedOutError + break + } + time.Sleep(10 * time.Millisecond) + } + return c, lastError +} diff --git a/runtime/v2/shim/dialer.go b/runtime/v2/shim/dialer.go deleted file mode 100644 index 5fdc06b23..000000000 --- a/runtime/v2/shim/dialer.go +++ /dev/null @@ -1,87 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "net" - "sync" - - v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" - "github.com/containerd/ttrpc" - "github.com/pkg/errors" -) - -type dialConnect func() (net.Conn, error) - -var errDialerClosed = errors.New("events dialer is closed") - -func newDialier(newFn dialConnect) *dialer { - return &dialer{ - newFn: newFn, - } -} - -type dialer struct { - mu sync.Mutex - - newFn dialConnect - service v1.EventsService - conn net.Conn - closed bool -} - -func (d *dialer) Get() (v1.EventsService, error) { - d.mu.Lock() - defer d.mu.Unlock() - - if d.closed { - return nil, errDialerClosed - } - if d.service == nil { - conn, err := d.newFn() - if err != nil { - return nil, err - } - d.conn = conn - d.service = v1.NewEventsClient(ttrpc.NewClient(conn)) - } - return d.service, nil -} - -func (d *dialer) Put(err error) { - if err != nil { - d.mu.Lock() - d.conn.Close() - d.service = nil - d.mu.Unlock() - } -} - -func (d *dialer) Close() (err error) { - d.mu.Lock() - if d.closed { - return errDialerClosed - } - if d.conn != nil { - err = d.conn.Close() - } - d.service = nil - d.closed = true - d.mu.Unlock() - - return err -} diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go index da83201e3..4c3519388 100644 --- a/runtime/v2/shim/publisher.go +++ b/runtime/v2/shim/publisher.go @@ -18,15 +18,17 @@ package shim import ( "context" - "net" "sync" "time" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl" + "github.com/sirupsen/logrus" + + "github.com/containerd/containerd" v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" - "github.com/containerd/typeurl" - "github.com/sirupsen/logrus" ) const ( @@ -40,20 +42,24 @@ type item struct { count int } -func newPublisher(address string) *remoteEventsPublisher { +func newPublisher(address string) (*remoteEventsPublisher, error) { + client, err := containerd.NewTTRPC(address) + if err != nil { + return nil, err + } + l := &remoteEventsPublisher{ - dialer: newDialier(func() (net.Conn, error) { - return connect(address, dial) - }), + client: client, closed: make(chan struct{}), requeue: make(chan *item, queueSize), } + go l.processQueue() - return l + return l, nil } type remoteEventsPublisher struct { - dialer *dialer + client *containerd.ClientTTRPC closed chan struct{} closer sync.Once requeue chan *item @@ -64,7 +70,7 @@ func (l *remoteEventsPublisher) Done() <-chan struct{} { } func (l *remoteEventsPublisher) Close() (err error) { - err = l.dialer.Close() + err = l.client.Close() l.closer.Do(func() { close(l.closed) }) @@ -79,18 +85,7 @@ func (l *remoteEventsPublisher) processQueue() { continue } - client, err := l.dialer.Get() - if err != nil { - l.dialer.Put(err) - - l.queue(i) - logrus.WithError(err).Error("get events client") - continue - } - if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ - Envelope: i.ev, - }); err != nil { - l.dialer.Put(err) + if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil { logrus.WithError(err).Error("forward event") l.queue(i) } @@ -124,22 +119,33 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event }, ctx: ctx, } - client, err := l.dialer.Get() - if err != nil { - l.dialer.Put(err) - l.queue(i) - return err - } - if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ - Envelope: i.ev, - }); err != nil { - l.dialer.Put(err) + + if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil { l.queue(i) return err } + return nil } -func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { - return d(address, 5*time.Second) +func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error { + _, err := l.client.EventsService().Forward(ctx, req) + if err == nil { + return nil + } + + if err != ttrpc.ErrClosed { + return err + } + + // Reconnect and retry request + if err := l.client.Reconnect(); err != nil { + return err + } + + if _, err := l.client.EventsService().Forward(ctx, req); err != nil { + return err + } + + return nil } diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 6b4d8b8a9..09d3b6018 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -162,9 +162,13 @@ func run(id string, initFunc Init, config Config) error { return err } } - address := fmt.Sprintf("%s.ttrpc", addressFlag) - publisher := newPublisher(address) + address := fmt.Sprintf("%s.ttrpc", addressFlag) + publisher, err := newPublisher(address) + if err != nil { + return err + } + defer publisher.Close() if namespaceFlag == "" { diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index 603e51bfb..dc3e6a891 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -24,9 +24,7 @@ import ( "net" "os" "os/signal" - "strings" "syscall" - "time" "github.com/containerd/fifo" "github.com/pkg/errors" @@ -93,8 +91,3 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si func openLog(ctx context.Context, _ string) (io.Writer, error) { return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) } - -func dial(address string, timeout time.Duration) (net.Conn, error) { - address = strings.TrimPrefix(address, "unix://") - return net.DialTimeout("unix", address, timeout) -} diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index 084be2b28..505d734f1 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -26,7 +26,6 @@ import ( "net" "os" "sync" - "time" "unsafe" winio "github.com/Microsoft/go-winio" @@ -286,35 +285,3 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { go dswl.beginAccept() return dswl, nil } - -func dial(address string, timeout time.Duration) (net.Conn, error) { - var c net.Conn - var lastError error - timedOutError := errors.Errorf("timed out waiting for npipe %s", address) - start := time.Now() - for { - remaining := timeout - time.Since(start) - if remaining <= 0 { - lastError = timedOutError - break - } - c, lastError = winio.DialPipe(address, &remaining) - if lastError == nil { - break - } - if !os.IsNotExist(lastError) { - break - } - // There is nobody serving the pipe. We limit the timeout for this case - // to 5 seconds because any shim that would serve this endpoint should - // serve it within 5 seconds. We use the passed in timeout for the - // `DialPipe` timeout if the pipe exists however to give the pipe time - // to `Accept` the connection. - if time.Since(start) >= 5*time.Second { - lastError = timedOutError - break - } - time.Sleep(10 * time.Millisecond) - } - return c, lastError -}