Add dialer for events service

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-04-11 11:37:01 -04:00
parent ae87730ad2
commit 047348e198
5 changed files with 178 additions and 60 deletions

87
runtime/v2/shim/dialer.go Normal file
View File

@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"net"
"sync"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
)
type dialConnect func() (net.Conn, error)
var errDialerClosed = errors.New("events dialer is closed")
func newDialier(newFn dialConnect) *dialer {
return &dialer{
newFn: newFn,
}
}
type dialer struct {
mu sync.Mutex
newFn dialConnect
service v1.EventsService
conn net.Conn
closed bool
}
func (d *dialer) Get() (v1.EventsService, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return nil, errDialerClosed
}
if d.service == nil {
conn, err := d.newFn()
if err != nil {
return nil, err
}
d.conn = conn
d.service = v1.NewEventsClient(ttrpc.NewClient(conn))
}
return d.service, nil
}
func (d *dialer) Put(err error) {
if err != nil {
d.mu.Lock()
d.conn.Close()
d.service = nil
d.mu.Unlock()
}
}
func (d *dialer) Close() (err error) {
d.mu.Lock()
if d.closed {
return errDialerClosed
}
if d.conn != nil {
err = d.conn.Close()
}
d.service = nil
d.closed = true
d.mu.Unlock()
return err
}

View File

@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"net"
"sync"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
)
func newPublisher(address string) *remoteEventsPublisher {
return &remoteEventsPublisher{
dialer: newDialier(func() (net.Conn, error) {
return connect(address, dial)
}),
closed: make(chan struct{}),
}
}
type remoteEventsPublisher struct {
dialer *dialer
closed chan struct{}
closer sync.Once
}
func (l *remoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}
func (l *remoteEventsPublisher) Close() (err error) {
err = l.dialer.Close()
l.closer.Do(func() {
close(l.closed)
})
return err
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
client, err := l.dialer.Get()
if err != nil {
return err
}
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
if _, err := client.Forward(ctx, &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
}); err != nil {
l.dialer.Put(err)
return err
}
return nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 5*time.Second)
}

View File

@ -21,21 +21,17 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"net"
"os" "os"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync"
"time" "time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task" shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -165,18 +161,10 @@ func run(id string, initFunc Init, config Config) error {
} }
} }
address := fmt.Sprintf("%s.ttrpc", addressFlag) address := fmt.Sprintf("%s.ttrpc", addressFlag)
conn, err := connect(address, dialer)
if err != nil { publisher := newPublisher(address)
return err
}
publisher := &remoteEventsPublisher{
address: address,
conn: conn,
closed: make(chan struct{}),
}
defer publisher.Close() defer publisher.Close()
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
if namespaceFlag == "" { if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty") return fmt.Errorf("shim namespace cannot be empty")
} }
@ -310,47 +298,3 @@ func dumpStacks(logger *logrus.Entry) {
buf = buf[:stackSize] buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
type remoteEventsPublisher struct {
address string
conn net.Conn
client v1.EventsService
closed chan struct{}
closer sync.Once
}
func (l *remoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}
func (l *remoteEventsPublisher) Close() (err error) {
l.closer.Do(func() {
err = l.conn.Close()
close(l.closed)
})
return err
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
_, err = l.client.Forward(ctx, &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
})
return err
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}

View File

@ -94,7 +94,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
} }
func dialer(address string, timeout time.Duration) (net.Conn, error) { func dial(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://") address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout) return net.DialTimeout("unix", address, timeout)
} }

View File

@ -287,7 +287,7 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
return dswl, nil return dswl, nil
} }
func dialer(address string, timeout time.Duration) (net.Conn, error) { func dial(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn var c net.Conn
var lastError error var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address) timedOutError := errors.Errorf("timed out waiting for npipe %s", address)