linux/shim: use events.Publisher interface

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-07-31 13:27:45 -07:00
parent 92d737f4ae
commit 7ed88c1e36
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
4 changed files with 54 additions and 60 deletions

View File

@ -3,6 +3,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -10,18 +11,19 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sys/unix" eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"google.golang.org/grpc" "github.com/containerd/containerd/events"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/linux/shim" "github.com/containerd/containerd/linux/shim"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/reaper" "github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/typeurl"
"github.com/containerd/containerd/version" "github.com/containerd/containerd/version"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/urfave/cli" "github.com/urfave/cli"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
) )
const usage = ` const usage = `
@ -82,7 +84,7 @@ func main() {
sv, err := shim.NewService( sv, err := shim.NewService(
path, path,
context.GlobalString("namespace"), context.GlobalString("namespace"),
e, &remoteEventsPublisher{client: e},
) )
if err != nil { if err != nil {
return err return err
@ -163,12 +165,12 @@ func dumpStacks() {
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
func connectEvents(address string) (events.EventsClient, error) { func connectEvents(address string) (eventsapi.EventsClient, error) {
conn, err := connect(address, dialer) conn, err := connect(address, dialer)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address) return nil, errors.Wrapf(err, "failed to dial %q", address)
} }
return events.NewEventsClient(conn), nil return eventsapi.NewEventsClient(conn), nil
} }
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
@ -194,3 +196,21 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
func dialAddress(address string) string { func dialAddress(address string) string {
return fmt.Sprintf("unix://%s", address) return fmt.Sprintf("unix://%s", address)
} }
type remoteEventsPublisher struct {
client eventsapi.EventsClient
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
if _, err := l.client.Publish(ctx, &eventsapi.PublishRequest{
Topic: topic,
Event: encoded,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}

View File

@ -150,9 +150,9 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer
} }
// WithLocal uses an in process shim // WithLocal uses an in process shim
func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config.Path, config.Namespace, &localEventsClient{publisher: events}) service, err := NewService(config.Path, config.Namespace, publisher)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -5,8 +5,6 @@ package shim
import ( import (
"path/filepath" "path/filepath"
events "github.com/containerd/containerd/api/services/events/v1"
evt "github.com/containerd/containerd/events"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -88,18 +86,3 @@ func (c *local) ShimInfo(ctx context.Context, in *google_protobuf.Empty, opts ..
func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
return c.s.Update(ctx, in) return c.s.Update(ctx, in)
} }
type publisher interface {
Publish(ctx context.Context, in *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error)
}
type localEventsClient struct {
publisher evt.Publisher
}
func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
if err := l.publisher.Publish(ctx, r.Topic, r.Event); err != nil {
return nil, err
}
return empty, nil
}

View File

@ -11,15 +11,15 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"github.com/containerd/console" "github.com/containerd/console"
events "github.com/containerd/containerd/api/services/events/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/reaper" "github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/typeurl"
google_protobuf "github.com/golang/protobuf/ptypes/empty" google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -30,7 +30,7 @@ var empty = &google_protobuf.Empty{}
const RuncRoot = "/run/containerd/runc" const RuncRoot = "/run/containerd/runc"
// NewService returns a new shim service that can be used via GRPC // NewService returns a new shim service that can be used via GRPC
func NewService(path, namespace string, client publisher) (*Service, error) { func NewService(path, namespace string, publisher events.Publisher) (*Service, error) {
if namespace == "" { if namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty") return nil, fmt.Errorf("shim namespace cannot be empty")
} }
@ -45,7 +45,7 @@ func NewService(path, namespace string, client publisher) (*Service, error) {
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")
} }
go s.forward(client) go s.forward(publisher)
return s, nil return s, nil
} }
@ -89,11 +89,11 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
ExitCh: make(chan int, 1), ExitCh: make(chan int, 1),
} }
reaper.Default.Register(pid, cmd) reaper.Default.Register(pid, cmd)
s.events <- &events.TaskCreate{ s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID, ContainerID: r.ID,
Bundle: r.Bundle, Bundle: r.Bundle,
Rootfs: r.Rootfs, Rootfs: r.Rootfs,
IO: &events.TaskIO{ IO: &eventsapi.TaskIO{
Stdin: r.Stdin, Stdin: r.Stdin,
Stdout: r.Stdout, Stdout: r.Stdout,
Stderr: r.Stderr, Stderr: r.Stderr,
@ -115,7 +115,7 @@ func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_
if err := s.initProcess.Start(ctx); err != nil { if err := s.initProcess.Start(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &events.TaskStart{ s.events <- &eventsapi.TaskStart{
ContainerID: s.id, ContainerID: s.id,
Pid: uint32(s.initProcess.Pid()), Pid: uint32(s.initProcess.Pid()),
} }
@ -132,7 +132,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
s.mu.Lock() s.mu.Lock()
delete(s.processes, p.ID()) delete(s.processes, p.ID())
s.mu.Unlock() s.mu.Unlock()
s.events <- &events.TaskDelete{ s.events <- &eventsapi.TaskDelete{
ContainerID: s.id, ContainerID: s.id,
ExitStatus: uint32(p.Status()), ExitStatus: uint32(p.Status()),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
@ -188,7 +188,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
reaper.Default.Register(pid, cmd) reaper.Default.Register(pid, cmd)
s.processes[r.ID] = process s.processes[r.ID] = process
s.events <- &events.TaskExecAdded{ s.events <- &eventsapi.TaskExecAdded{
ContainerID: s.id, ContainerID: s.id,
ExecID: r.ID, ExecID: r.ID,
Pid: uint32(pid), Pid: uint32(pid),
@ -262,7 +262,7 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
if err := s.initProcess.Pause(ctx); err != nil { if err := s.initProcess.Pause(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &events.TaskPaused{ s.events <- &eventsapi.TaskPaused{
ContainerID: s.id, ContainerID: s.id,
} }
return empty, nil return empty, nil
@ -275,7 +275,7 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
if err := s.initProcess.Resume(ctx); err != nil { if err := s.initProcess.Resume(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &events.TaskResumed{ s.events <- &eventsapi.TaskResumed{
ContainerID: s.id, ContainerID: s.id,
} }
return empty, nil return empty, nil
@ -329,7 +329,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque
if err := s.initProcess.Checkpoint(ctx, r); err != nil { if err := s.initProcess.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
s.events <- &events.TaskCheckpointed{ s.events <- &eventsapi.TaskCheckpointed{
ContainerID: s.id, ContainerID: s.id,
} }
return empty, nil return empty, nil
@ -356,7 +356,7 @@ func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
p.Exited(status) p.Exited(status)
reaper.Default.Delete(pid) reaper.Default.Delete(pid)
s.events <- &events.TaskExit{ s.events <- &eventsapi.TaskExit{
ContainerID: s.id, ContainerID: s.id,
ID: p.ID(), ID: p.ID(),
Pid: uint32(pid), Pid: uint32(pid),
@ -377,18 +377,9 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil return pids, nil
} }
func (s *Service) forward(client publisher) { func (s *Service) forward(publisher events.Publisher) {
for e := range s.events { for e := range s.events {
a, err := typeurl.MarshalAny(e) if err := publisher.Publish(s.context, getTopic(e), e); err != nil {
if err != nil {
log.G(s.context).WithError(err).Error("marshal event")
continue
}
if _, err := client.Publish(s.context, &events.PublishRequest{
Topic: getTopic(e),
Event: a,
}); err != nil {
log.G(s.context).WithError(err).Error("post event") log.G(s.context).WithError(err).Error("post event")
} }
} }
@ -396,23 +387,23 @@ func (s *Service) forward(client publisher) {
func getTopic(e interface{}) string { func getTopic(e interface{}) string {
switch e.(type) { switch e.(type) {
case *events.TaskCreate: case *eventsapi.TaskCreate:
return runtime.TaskCreateEventTopic return runtime.TaskCreateEventTopic
case *events.TaskStart: case *eventsapi.TaskStart:
return runtime.TaskStartEventTopic return runtime.TaskStartEventTopic
case *events.TaskOOM: case *eventsapi.TaskOOM:
return runtime.TaskOOMEventTopic return runtime.TaskOOMEventTopic
case *events.TaskExit: case *eventsapi.TaskExit:
return runtime.TaskExitEventTopic return runtime.TaskExitEventTopic
case *events.TaskDelete: case *eventsapi.TaskDelete:
return runtime.TaskDeleteEventTopic return runtime.TaskDeleteEventTopic
case *events.TaskExecAdded: case *eventsapi.TaskExecAdded:
return runtime.TaskExecAddedEventTopic return runtime.TaskExecAddedEventTopic
case *events.TaskPaused: case *eventsapi.TaskPaused:
return runtime.TaskPausedEventTopic return runtime.TaskPausedEventTopic
case *events.TaskResumed: case *eventsapi.TaskResumed:
return runtime.TaskResumedEventTopic return runtime.TaskResumedEventTopic
case *events.TaskCheckpointed: case *eventsapi.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic return runtime.TaskCheckpointedEventTopic
} }
return "?" return "?"