From 71ef776082bbe6b65f53a172bbe6b69d5d04bde7 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 10 Dec 2015 12:30:04 -0800 Subject: [PATCH] Add events support in client Signed-off-by: Michael Crosby --- api/grpc/server/server.go | 2 ++ ctr/events.go | 34 ++++++++++++++++++++++++++++++++++ ctr/main.go | 1 + delete.go | 6 ++++++ exit.go | 10 ++++++---- stats.go | 8 +++++--- supervisor.go | 29 ++++++++++++++++------------- 7 files changed, 70 insertions(+), 20 deletions(-) create mode 100644 ctr/events.go diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 9ccca9220..5a345b1b0 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -217,12 +217,14 @@ func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContaine func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error { events := s.sv.Events() + defer s.sv.Unsubscribe(events) for evt := range events { switch evt.Type { case containerd.ExitEventType: ev := &types.Event{ Type: "exit", Id: evt.ID, + Pid: uint32(evt.Pid), Status: uint32(evt.Status), } if err := stream.Send(ev); err != nil { diff --git a/ctr/events.go b/ctr/events.go new file mode 100644 index 000000000..fd00a92c2 --- /dev/null +++ b/ctr/events.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "os" + "text/tabwriter" + + "github.com/codegangsta/cli" + "github.com/docker/containerd/api/grpc/types" + netcontext "golang.org/x/net/context" +) + +var EventsCommand = cli.Command{ + Name: "events", + Usage: "receive events from the containerd daemon", + Action: func(context *cli.Context) { + c := getClient() + events, err := c.Events(netcontext.Background(), &types.EventsRequest{}) + if err != nil { + fatal(err.Error(), 1) + } + w := tabwriter.NewWriter(os.Stdout, 20, 1, 3, ' ', 0) + fmt.Fprint(w, "TYPE\tID\tPID\tSTATUS\n") + w.Flush() + for { + e, err := events.Recv() + if err != nil { + fatal(err.Error(), 1) + } + fmt.Fprintf(w, "%s\t%s\t%d\t%d\n", e.Type, e.Id, e.Pid, e.Status) + w.Flush() + } + }, +} diff --git a/ctr/main.go b/ctr/main.go index 4a2d8f151..3575f1724 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -36,6 +36,7 @@ func main() { app.Commands = []cli.Command{ ContainersCommand, CheckpointCommand, + EventsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/delete.go b/delete.go index c36a2792e..dd30d1859 100644 --- a/delete.go +++ b/delete.go @@ -14,6 +14,12 @@ func (h *DeleteEvent) Handle(e *Event) error { if err := h.deleteContainer(container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } + h.s.NotifySubscribers(&Event{ + Type: ExitEventType, + ID: e.ID, + Status: e.Status, + Pid: e.Pid, + }) ContainersCounter.Dec(1) h.s.containerGroup.Done() } diff --git a/exit.go b/exit.go index 25d0c339f..1cc309c9d 100644 --- a/exit.go +++ b/exit.go @@ -6,10 +6,6 @@ type ExitEvent struct { s *Supervisor } -type ExecExitEvent struct { - s *Supervisor -} - func (h *ExitEvent) Handle(e *Event) error { logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") @@ -34,10 +30,16 @@ func (h *ExitEvent) Handle(e *Event) error { container.SetExited(e.Status) ne := NewEvent(DeleteEventType) ne.ID = container.ID() + ne.Pid = e.Pid + ne.Status = e.Status h.s.SendEvent(ne) return nil } +type ExecExitEvent struct { + s *Supervisor +} + func (h *ExecExitEvent) Handle(e *Event) error { // exec process: we remove this process without notifying the main event loop container := h.s.processes[e.Pid] diff --git a/stats.go b/stats.go index 41ae8939c..4f4a21b79 100644 --- a/stats.go +++ b/stats.go @@ -3,9 +3,10 @@ package containerd import "github.com/rcrowley/go-metrics" var ( - ContainerStartTimer = metrics.NewTimer() - ContainersCounter = metrics.NewCounter() - EventsCounter = metrics.NewCounter() + ContainerStartTimer = metrics.NewTimer() + ContainersCounter = metrics.NewCounter() + EventsCounter = metrics.NewCounter() + EventSubscriberCounter = metrics.NewCounter() ) func Metrics() map[string]interface{} { @@ -13,5 +14,6 @@ func Metrics() map[string]interface{} { "container-start-time": ContainerStartTimer, "containers": ContainersCounter, "events": EventsCounter, + "events-subscribers": EventSubscriberCounter, } } diff --git a/supervisor.go b/supervisor.go index b3938b2c2..1af293be9 100644 --- a/supervisor.go +++ b/supervisor.go @@ -28,13 +28,14 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err return nil, err } s := &Supervisor{ - stateDir: stateDir, - containers: make(map[string]runtime.Container), - processes: make(map[int]runtime.Container), - runtime: r, - tasks: tasks, - events: make(chan *Event, 2048), - machine: machine, + stateDir: stateDir, + containers: make(map[string]runtime.Container), + processes: make(map[int]runtime.Container), + runtime: r, + tasks: tasks, + events: make(chan *Event, 2048), + machine: machine, + subscribers: make(map[chan *Event]struct{}), } // register default event handlers s.handlers = map[EventType]Handler{ @@ -62,13 +63,11 @@ type Supervisor struct { runtime runtime.Runtime events chan *Event tasks chan *StartTask - subscribers map[subscriber]bool + subscribers map[chan *Event]struct{} machine Machine containerGroup sync.WaitGroup } -type subscriber chan *Event - func (s *Supervisor) Stop(sig chan os.Signal) { // Close the tasks channel so that no new containers get started close(s.tasks) @@ -109,12 +108,16 @@ func (s *Supervisor) Close() error { return nil } -func (s *Supervisor) Events() subscriber { - return subscriber(make(chan *Event)) +func (s *Supervisor) Events() chan *Event { + c := make(chan *Event, 2048) + EventSubscriberCounter.Inc(1) + s.subscribers[c] = struct{}{} + return c } -func (s *Supervisor) Unsubscribe(sub subscriber) { +func (s *Supervisor) Unsubscribe(sub chan *Event) { delete(s.subscribers, sub) + EventSubscriberCounter.Dec(1) } func (s *Supervisor) NotifySubscribers(e *Event) {