Use ttrpc to publish runtime v2 events

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2019-04-09 13:08:35 -04:00
parent a8a805cad3
commit a6f587e4c4
10 changed files with 90 additions and 991 deletions

View File

@@ -117,7 +117,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd

View File

@@ -133,7 +133,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd

View File

@@ -20,17 +20,20 @@ import (
"context"
"flag"
"fmt"
"net"
"os"
"runtime"
"runtime/debug"
"strings"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -153,11 +156,16 @@ func run(id string, initFunc Init, config Config) error {
return err
}
}
publisher := &remoteEventsPublisher{
address: addressFlag,
containerdBinaryPath: containerdBinaryFlag,
noReaper: config.NoReaper,
address: fmt.Sprintf("%s.ttrpc", addressFlag),
}
conn, err := connect(publisher.address, dialer)
if err != nil {
return err
}
defer conn.Close()
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
@@ -282,7 +290,22 @@ func dumpStacks(logger *logrus.Entry) {
}
type remoteEventsPublisher struct {
address string
containerdBinaryPath string
noReaper bool
address string
client v1.EventsService
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
_, err = l.client.Publish(ctx, &v1.PublishRequest{
Topic: topic,
Event: any,
})
return err
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}

View File

@@ -19,31 +19,21 @@
package shim
import (
"bytes"
"context"
"io"
"net"
"os"
"os/exec"
"os/signal"
"sync"
"strings"
"syscall"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/fifo"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals(config Config) (chan os.Signal, error) {
@@ -101,41 +91,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
b := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(b)
cmd.Stdout = b
cmd.Stderr = b
if l.noReaper {
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return errors.Wrapf(err, "failed to publish event: %s", b.String())
}
return nil
}
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return errors.Wrapf(err, "failed to publish event: %s", b.String())
}
if status != 0 {
return errors.Errorf("failed to publish event: %s", b.String())
}
return nil
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}

View File

@@ -25,15 +25,13 @@ import (
"io"
"net"
"os"
"os/exec"
"sync"
"time"
"unsafe"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
@@ -286,17 +284,34 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
return dswl, nil
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
func dialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Since(start)
if remaining <= 0 {
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
if lastError == nil {
break
}
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Since(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
return cmd.Run()
return c, lastError
}