Merge pull request #1263 from crosbymichael/no-shim-events

event forwarding without shim
This commit is contained in:
Michael Crosby 2017-07-31 14:26:18 -04:00 committed by GitHub
commit ce07fa04ac
5 changed files with 63 additions and 27 deletions

View File

@ -8,15 +8,18 @@ import (
"os" "os"
"runtime" "runtime"
"strings" "strings"
"time"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"google.golang.org/grpc" "google.golang.org/grpc"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/linux/shim" "github.com/containerd/containerd/linux/shim"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/reaper" "github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/version" "github.com/containerd/containerd/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
@ -72,10 +75,14 @@ func main() {
return err return err
} }
server := grpc.NewServer() server := grpc.NewServer()
e, err := connectEvents(context.GlobalString("address"))
if err != nil {
return err
}
sv, err := shim.NewService( sv, err := shim.NewService(
path, path,
context.GlobalString("namespace"), context.GlobalString("namespace"),
context.GlobalString("address"), e,
) )
if err != nil { if err != nil {
return err return err
@ -155,3 +162,35 @@ func dumpStacks() {
buf = buf[:stackSize] buf = buf[:stackSize]
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
func connectEvents(address string) (events.EventsClient, error) {
conn, err := connect(address, dialer)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return events.NewEventsClient(conn), nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second),
grpc.WithDialer(d),
grpc.FailOnNonTempDialError(true),
}
conn, err := grpc.Dial(dialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return conn, nil
}
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
func dialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}

View File

@ -9,21 +9,23 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/linux/runcopts" "github.com/containerd/containerd/linux/runcopts"
client "github.com/containerd/containerd/linux/shim" client "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/typeurl" "github.com/containerd/containerd/typeurl"
) )
func loadBundle(path, namespace string) *bundle { func loadBundle(path, namespace string, events *events.Exchange) *bundle {
return &bundle{ return &bundle{
path: path, path: path,
namespace: namespace, namespace: namespace,
events: events,
} }
} }
// newBundle creates a new bundle on disk at the provided path for the given id // newBundle creates a new bundle on disk at the provided path for the given id
func newBundle(path, namespace, id string, spec []byte) (b *bundle, err error) { func newBundle(path, namespace, id string, spec []byte, events *events.Exchange) (b *bundle, err error) {
if err := os.MkdirAll(path, 0711); err != nil { if err := os.MkdirAll(path, 0711); err != nil {
return nil, err return nil, err
} }
@ -49,6 +51,7 @@ func newBundle(path, namespace, id string, spec []byte) (b *bundle, err error) {
id: id, id: id,
path: path, path: path,
namespace: namespace, namespace: namespace,
events: events,
}, err }, err
} }
@ -56,13 +59,14 @@ type bundle struct {
id string id string
path string path string
namespace string namespace string
events *events.Exchange
} }
// NewShim connects to the shim managing the bundle and tasks // NewShim connects to the shim managing the bundle and tasks
func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts) (*client.Client, error) { func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts) (*client.Client, error) {
opt := client.WithStart(binary, grpcAddress, debug) opt := client.WithStart(binary, grpcAddress, debug)
if !remote { if !remote {
opt = client.WithLocal opt = client.WithLocal(b.events)
} }
var options runcopts.CreateOptions var options runcopts.CreateOptions
if createOpts.Options != nil { if createOpts.Options != nil {
@ -84,7 +88,7 @@ func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote
func (b *bundle) Connect(ctx context.Context, remote bool) (*client.Client, error) { func (b *bundle) Connect(ctx context.Context, remote bool) (*client.Client, error) {
opt := client.WithConnect opt := client.WithConnect
if !remote { if !remote {
opt = client.WithLocal opt = client.WithLocal(b.events)
} }
return client.New(ctx, client.Config{ return client.New(ctx, client.Config{
Address: b.shimAddress(), Address: b.shimAddress(),

View File

@ -12,6 +12,7 @@ import (
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/identifiers"
client "github.com/containerd/containerd/linux/shim" client "github.com/containerd/containerd/linux/shim"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
@ -90,6 +91,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
tasks: runtime.NewTaskList(), tasks: runtime.NewTaskList(),
db: m.(*bolt.DB), db: m.(*bolt.DB),
address: ic.Address, address: ic.Address,
events: ic.Events,
} }
tasks, err := r.restoreTasks(ic.Context) tasks, err := r.restoreTasks(ic.Context)
if err != nil { if err != nil {
@ -114,6 +116,7 @@ type Runtime struct {
monitor runtime.TaskMonitor monitor runtime.TaskMonitor
tasks *runtime.TaskList tasks *runtime.TaskList
db *bolt.DB db *bolt.DB
events *events.Exchange
} }
func (r *Runtime) ID() string { func (r *Runtime) ID() string {
@ -130,7 +133,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, errors.Wrapf(err, "invalid task id") return nil, errors.Wrapf(err, "invalid task id")
} }
bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec.Value) bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec.Value, r.events)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -203,7 +206,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
} }
r.tasks.Delete(ctx, lc) r.tasks.Delete(ctx, lc)
bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace) bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace, r.events)
if err := bundle.Delete(); err != nil { if err := bundle.Delete(); err != nil {
return nil, err return nil, err
} }
@ -254,7 +257,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue continue
} }
id := path.Name() id := path.Name()
bundle := loadBundle(filepath.Join(r.root, ns, id), ns) bundle := loadBundle(filepath.Join(r.root, ns, id), ns, r.events)
s, err := bundle.Connect(ctx, r.remote) s, err := bundle.Connect(ctx, r.remote)
if err != nil { if err != nil {

View File

@ -17,6 +17,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/containerd/containerd/events"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper" "github.com/containerd/containerd/reaper"
@ -149,12 +150,14 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer
} }
// WithLocal uses an in process shim // WithLocal uses an in process shim
func WithLocal(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config.Path, config.Namespace, "") return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
if err != nil { service, err := NewService(config.Path, config.Namespace, &localEventsClient{forwarder: events})
return nil, nil, err if err != nil {
return nil, nil, err
}
return NewLocal(service), nil, nil
} }
return NewLocal(service), nil, nil
} }
type Config struct { type Config struct {

View File

@ -15,7 +15,6 @@ import (
events "github.com/containerd/containerd/api/services/events/v1" events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
evt "github.com/containerd/containerd/events"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -32,23 +31,11 @@ var empty = &google_protobuf.Empty{}
const RuncRoot = "/run/containerd/runc" const RuncRoot = "/run/containerd/runc"
// NewService returns a new shim service that can be used via GRPC // NewService returns a new shim service that can be used via GRPC
func NewService(path, namespace, address string) (*Service, error) { func NewService(path, namespace string, client publisher) (*Service, error) {
if namespace == "" { if namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty") return nil, fmt.Errorf("shim namespace cannot be empty")
} }
context := namespaces.WithNamespace(context.Background(), namespace) context := namespaces.WithNamespace(context.Background(), namespace)
var client publisher
if address != "" {
conn, err := connect(address, dialer)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
client = events.NewEventsClient(conn)
} else {
client = &localEventsClient{
forwarder: evt.NewExchange(),
}
}
s := &Service{ s := &Service{
path: path, path: path,
processes: make(map[string]process), processes: make(map[string]process),