From 047348e198cc17404e34717d00ece685bfe3fbfa Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 11 Apr 2019 11:37:01 -0400 Subject: [PATCH] Add dialer for events service Signed-off-by: Michael Crosby --- runtime/v2/shim/dialer.go | 87 +++++++++++++++++++++++++++++++++ runtime/v2/shim/publisher.go | 87 +++++++++++++++++++++++++++++++++ runtime/v2/shim/shim.go | 60 +---------------------- runtime/v2/shim/shim_unix.go | 2 +- runtime/v2/shim/shim_windows.go | 2 +- 5 files changed, 178 insertions(+), 60 deletions(-) create mode 100644 runtime/v2/shim/dialer.go create mode 100644 runtime/v2/shim/publisher.go diff --git a/runtime/v2/shim/dialer.go b/runtime/v2/shim/dialer.go new file mode 100644 index 000000000..5fdc06b23 --- /dev/null +++ b/runtime/v2/shim/dialer.go @@ -0,0 +1,87 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "net" + "sync" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/ttrpc" + "github.com/pkg/errors" +) + +type dialConnect func() (net.Conn, error) + +var errDialerClosed = errors.New("events dialer is closed") + +func newDialier(newFn dialConnect) *dialer { + return &dialer{ + newFn: newFn, + } +} + +type dialer struct { + mu sync.Mutex + + newFn dialConnect + service v1.EventsService + conn net.Conn + closed bool +} + +func (d *dialer) Get() (v1.EventsService, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.closed { + return nil, errDialerClosed + } + if d.service == nil { + conn, err := d.newFn() + if err != nil { + return nil, err + } + d.conn = conn + d.service = v1.NewEventsClient(ttrpc.NewClient(conn)) + } + return d.service, nil +} + +func (d *dialer) Put(err error) { + if err != nil { + d.mu.Lock() + d.conn.Close() + d.service = nil + d.mu.Unlock() + } +} + +func (d *dialer) Close() (err error) { + d.mu.Lock() + if d.closed { + return errDialerClosed + } + if d.conn != nil { + err = d.conn.Close() + } + d.service = nil + d.closed = true + d.mu.Unlock() + + return err +} diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go new file mode 100644 index 000000000..915d5cd4d --- /dev/null +++ b/runtime/v2/shim/publisher.go @@ -0,0 +1,87 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "context" + "net" + "sync" + "time" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/typeurl" +) + +func newPublisher(address string) *remoteEventsPublisher { + return &remoteEventsPublisher{ + dialer: newDialier(func() (net.Conn, error) { + return connect(address, dial) + }), + closed: make(chan struct{}), + } +} + +type remoteEventsPublisher struct { + dialer *dialer + closed chan struct{} + closer sync.Once +} + +func (l *remoteEventsPublisher) Done() <-chan struct{} { + return l.closed +} + +func (l *remoteEventsPublisher) Close() (err error) { + err = l.dialer.Close() + l.closer.Do(func() { + close(l.closed) + }) + return err +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + client, err := l.dialer.Get() + if err != nil { + return err + } + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + any, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + if _, err := client.Forward(ctx, &v1.ForwardRequest{ + Envelope: &v1.Envelope{ + Timestamp: time.Now(), + Namespace: ns, + Topic: topic, + Event: any, + }, + }); err != nil { + l.dialer.Put(err) + return err + } + return nil +} + +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { + return d(address, 5*time.Second) +} diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 2bc8f1b07..18937f8ef 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -21,21 +21,17 @@ import ( "flag" "fmt" "io" - "net" "os" "runtime" "runtime/debug" "strings" - "sync" "time" - v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" - "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -165,18 +161,10 @@ func run(id string, initFunc Init, config Config) error { } } address := fmt.Sprintf("%s.ttrpc", addressFlag) - conn, err := connect(address, dialer) - if err != nil { - return err - } - publisher := &remoteEventsPublisher{ - address: address, - conn: conn, - closed: make(chan struct{}), - } + + publisher := newPublisher(address) defer publisher.Close() - publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn)) if namespaceFlag == "" { return fmt.Errorf("shim namespace cannot be empty") } @@ -310,47 +298,3 @@ func dumpStacks(logger *logrus.Entry) { buf = buf[:stackSize] logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } - -type remoteEventsPublisher struct { - address string - conn net.Conn - client v1.EventsService - closed chan struct{} - closer sync.Once -} - -func (l *remoteEventsPublisher) Done() <-chan struct{} { - return l.closed -} - -func (l *remoteEventsPublisher) Close() (err error) { - l.closer.Do(func() { - err = l.conn.Close() - close(l.closed) - }) - return err -} - -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - any, err := typeurl.MarshalAny(event) - if err != nil { - return err - } - _, err = l.client.Forward(ctx, &v1.ForwardRequest{ - Envelope: &v1.Envelope{ - Timestamp: time.Now(), - Namespace: ns, - Topic: topic, - Event: any, - }, - }) - return err -} - -func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { - return d(address, 100*time.Second) -} diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index d7de92a08..603e51bfb 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -94,7 +94,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) { return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) } -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func dial(address string, timeout time.Duration) (net.Conn, error) { address = strings.TrimPrefix(address, "unix://") return net.DialTimeout("unix", address, timeout) } diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index a399da0cb..084be2b28 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -287,7 +287,7 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { return dswl, nil } -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func dial(address string, timeout time.Duration) (net.Conn, error) { var c net.Conn var lastError error timedOutError := errors.Errorf("timed out waiting for npipe %s", address)