diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index ceacf0c1b..3e2b2fb10 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -13,14 +13,17 @@ import ( "strings" "syscall" + gocontext "golang.org/x/net/context" "google.golang.org/grpc" - "github.com/Sirupsen/logrus" "github.com/docker/containerd" api "github.com/docker/containerd/api/execution" + "github.com/docker/containerd/events" "github.com/docker/containerd/execution" "github.com/docker/containerd/execution/executors/oci" + "github.com/docker/containerd/log" metrics "github.com/docker/go-metrics" + "github.com/sirupsen/logrus" "github.com/urfave/cli" "github.com/nats-io/go-nats" @@ -85,22 +88,10 @@ high performance container runtime go serveMetrics(address) } - eventsURL, err := url.Parse(context.GlobalString("events-address")) + s, err := startNATSServer(context) if err != nil { - return err + return nil } - - no := stand.DefaultNatsServerOptions - nOpts := &no - nOpts.NoSigs = true - parts := strings.Split(eventsURL.Host, ":") - nOpts.Host = parts[0] - if len(parts) == 2 { - nOpts.Port, err = strconv.Atoi(parts[1]) - } else { - nOpts.Port = nats.DefaultPort - } - s := stand.RunServerWithOpts(nil, nOpts) defer s.Shutdown() path := context.GlobalString("socket") @@ -121,24 +112,31 @@ high performance container runtime } } - // Start events listener - nc, err := nats.Connect(context.GlobalString("events-address")) + // Get events publisher + nec, err := getNATSPublisher(context) if err != nil { return err } - nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - nc.Close() - return err - } defer nec.Close() - execService, err := execution.New(executor, nec) + execService, err := execution.New(executor) if err != nil { return err } - server := grpc.NewServer() + // Intercept the GRPC call in order to populate the correct module path + interceptor := func(ctx gocontext.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ctx = log.WithModule(ctx, "containerd") + switch info.Server.(type) { + case api.ExecutionServiceServer: + ctx = log.WithModule(ctx, "execution") + ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + default: + fmt.Println("Unknown type: %#v", info.Server) + } + return handler(ctx, req) + } + server := grpc.NewServer(grpc.UnaryInterceptor(interceptor)) api.RegisterExecutionServiceServer(server, execService) go serveGRPC(server, l) @@ -201,3 +199,48 @@ func dumpStacks() { buf = buf[:stackSize] logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } + +func startNATSServer(context *cli.Context) (e *stand.StanServer, err error) { + eventsURL, err := url.Parse(context.GlobalString("events-address")) + if err != nil { + return nil, err + } + + no := stand.DefaultNatsServerOptions + nOpts := &no + nOpts.NoSigs = true + parts := strings.Split(eventsURL.Host, ":") + nOpts.Host = parts[0] + if len(parts) == 2 { + nOpts.Port, err = strconv.Atoi(parts[1]) + } else { + nOpts.Port = nats.DefaultPort + } + defer func() { + if r := recover(); r != nil { + e = nil + if _, ok := r.(error); !ok { + err = fmt.Errorf("failed to start NATS server: %v", r) + } else { + err = r.(error) + } + } + }() + s := stand.RunServerWithOpts(nil, nOpts) + + return s, nil +} + +func getNATSPublisher(context *cli.Context) (*nats.EncodedConn, error) { + nc, err := nats.Connect(context.GlobalString("events-address")) + if err != nil { + return nil, err + } + nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + nc.Close() + return nil, err + } + + return nec, nil +} diff --git a/cmd/ctr/main.go b/cmd/ctr/main.go index bbddb5b79..a880c72d4 100644 --- a/cmd/ctr/main.go +++ b/cmd/ctr/main.go @@ -4,8 +4,8 @@ import ( "fmt" "os" - "github.com/Sirupsen/logrus" "github.com/docker/containerd" + "github.com/sirupsen/logrus" "github.com/urfave/cli" ) diff --git a/events/nats.go b/events/nats.go new file mode 100644 index 000000000..f833567e7 --- /dev/null +++ b/events/nats.go @@ -0,0 +1,31 @@ +package events + +import ( + "context" + "strings" + + "github.com/docker/containerd/log" + nats "github.com/nats-io/go-nats" +) + +type natsPoster struct { + nec *nats.EncodedConn +} + +func GetNATSPoster(nec *nats.EncodedConn) Poster { + return &natsPoster{nec} +} + +func (p *natsPoster) Post(ctx context.Context, e Event) { + subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1) + topic := getTopic(ctx) + if topic != "" { + subject = strings.Join([]string{subject, topic}, ".") + } + + if subject == "" { + log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty") + } + + p.nec.Publish(subject, e) +} diff --git a/events/poster.go b/events/poster.go index 170f899e6..819aa468d 100644 --- a/events/poster.go +++ b/events/poster.go @@ -3,8 +3,8 @@ package events import ( "context" - "github.com/Sirupsen/logrus" "github.com/docker/containerd/log" + "github.com/sirupsen/logrus" ) var ( @@ -13,13 +13,17 @@ var ( // Poster posts the event. type Poster interface { - Post(event Event) + Post(ctx context.Context, event Event) } type posterKey struct{} +func WithPoster(ctx context.Context, poster Poster) context.Context { + return context.WithValue(ctx, posterKey{}, poster) +} + func GetPoster(ctx context.Context) Poster { - poster := ctx.Value(ctx) + poster := ctx.Value(posterKey{}) if poster == nil { logger := log.G(ctx) tx, _ := getTx(ctx) @@ -27,7 +31,7 @@ func GetPoster(ctx context.Context) Poster { // likely means we don't have a configured event system. Just return // the default poster, which merely logs events. - return posterFunc(func(event Event) { + return posterFunc(func(ctx context.Context, event Event) { fields := logrus.Fields{"event": event} if topic != "" { @@ -48,8 +52,8 @@ func GetPoster(ctx context.Context) Poster { return poster.(Poster) } -type posterFunc func(event Event) +type posterFunc func(ctx context.Context, event Event) -func (fn posterFunc) Post(event Event) { - fn(event) +func (fn posterFunc) Post(ctx context.Context, event Event) { + fn(ctx, event) } diff --git a/events/transaction.go b/events/transaction.go index c1615eb2b..c314d6104 100644 --- a/events/transaction.go +++ b/events/transaction.go @@ -18,6 +18,7 @@ func nexttxID() int64 { } type transaction struct { + ctx context.Context id int64 parent *transaction // if nil, no parent transaction finish sync.Once @@ -25,17 +26,18 @@ type transaction struct { } // begin creates a sub-transaction. -func (tx *transaction) begin(poster Poster) *transaction { +func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction { id := nexttxID() child := &transaction{ + ctx: ctx, id: id, parent: tx, start: time.Now(), } // post the transaction started event - poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events + poster.Post(ctx, child.makeTransactionEvent("begin")) // tranactions are really just events return child } @@ -44,7 +46,7 @@ func (tx *transaction) begin(poster Poster) *transaction { func (tx *transaction) commit(poster Poster) { tx.finish.Do(func() { tx.end = time.Now() - poster.Post(tx.makeTransactionEvent("commit")) + poster.Post(tx.ctx, tx.makeTransactionEvent("commit")) }) } @@ -53,7 +55,7 @@ func (tx *transaction) rollback(poster Poster, cause error) { tx.end = time.Now() event := tx.makeTransactionEvent("rollback") event = fmt.Sprintf("%s error=%q", event, cause.Error()) - poster.Post(event) + poster.Post(tx.ctx, event) }) } @@ -84,7 +86,7 @@ func getTx(ctx context.Context) (*transaction, bool) { func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { poster := G(pctx) parent, _ := getTx(pctx) - tx := parent.begin(poster) + tx := parent.begin(pctx, poster) return context.WithValue(pctx, txKey{}, tx), func() { tx.commit(poster) diff --git a/execution/events.go b/execution/events.go index f33c2db4d..47a62133a 100644 --- a/execution/events.go +++ b/execution/events.go @@ -1,8 +1,11 @@ package execution +import "time" + type ContainerEvent struct { - ID string - Action string + Timestamp time.Time + ID string + Action string } type ContainerExitEvent struct { @@ -16,6 +19,6 @@ const ( ) const ( - containerEventsSubjectFormat = "containerd.execution.container.%s" - containerProcessEventsSubjectFormat = "containerd.execution.container.%s.%s" + containerEventsTopicFormat = "container.%s" + containerProcessEventsTopicFormat = "container.%s.%s" ) diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 96b3d41eb..685b9d5d6 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -12,7 +12,9 @@ import ( "github.com/docker/containerd/execution" ) -var ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") +var ( + ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") +) func New(root string) (*OCIRuntime, error) { err := SetSubreaper(1) diff --git a/execution/log.go b/execution/log.go new file mode 100644 index 000000000..21f55e9d9 --- /dev/null +++ b/execution/log.go @@ -0,0 +1,19 @@ +package execution + +import ( + "context" + + "github.com/docker/containerd/log" + "github.com/sirupsen/logrus" +) + +var ctx context.Context + +func GetLogger(module string) *logrus.Entry { + if ctx == nil { + ctx = log.WithModule(context.Background(), "execution") + } + + subCtx := log.WithModule(ctx, module) + return log.GetLogger(subCtx) +} diff --git a/execution/service.go b/execution/service.go index 46df7f8b5..9a0837284 100644 --- a/execution/service.go +++ b/execution/service.go @@ -3,10 +3,11 @@ package execution import ( "fmt" "syscall" + "time" api "github.com/docker/containerd/api/execution" + "github.com/docker/containerd/events" google_protobuf "github.com/golang/protobuf/ptypes/empty" - "github.com/nats-io/go-nats" "github.com/opencontainers/runtime-spec/specs-go" "golang.org/x/net/context" ) @@ -16,17 +17,15 @@ var ( ErrProcessNotFound = fmt.Errorf("Process not found") ) -func New(executor Executor, nec *nats.EncodedConn) (*Service, error) { +func New(executor Executor) (*Service, error) { return &Service{ executor: executor, - nec: nec, }, nil } type Service struct { executor Executor supervisor *Supervisor - nec *nats.EncodedConn } func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { @@ -46,7 +45,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*a procs := container.Processes() initProcess := procs[0] - s.monitorProcess(container, initProcess) + s.monitorProcess(ctx, container, initProcess) return &api.CreateContainerResponse{ Container: toGRPCContainer(container), @@ -145,7 +144,7 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) return nil, err } - s.monitorProcess(container, process) + s.monitorProcess(ctx, container, process) return &api.StartProcessResponse{ Process: toGRPCProcess(process), @@ -205,27 +204,21 @@ var ( _ = (api.ExecutionServiceServer)(&Service{}) ) -func (s *Service) publishEvent(name string, v interface{}) { - if s.nec == nil { - return - } - - err := s.nec.Publish(name, v) - if err != nil { - // TODO: Use logrus? - fmt.Println("Failed to publish '%s:%#v': %v", name, v, err) - } +func (s *Service) publishEvent(ctx context.Context, topic string, v interface{}) { + ctx = events.WithTopic(ctx, topic) + events.GetPoster(ctx).Post(ctx, v) } -func (s *Service) monitorProcess(container *Container, process Process) { +func (s *Service) monitorProcess(ctx context.Context, container *Container, process Process) { go func() { status, err := process.Wait() if err == nil { - subject := GetContainerProcessEventSubject(container.ID(), process.ID()) - s.publishEvent(subject, &ContainerExitEvent{ + topic := GetContainerProcessEventTopic(container.ID(), process.ID()) + s.publishEvent(ctx, topic, &ContainerExitEvent{ ContainerEvent: ContainerEvent{ - ID: container.ID(), - Action: "exit", + Timestamp: time.Now(), + ID: container.ID(), + Action: "exit", }, PID: process.ID(), StatusCode: status, @@ -234,12 +227,12 @@ func (s *Service) monitorProcess(container *Container, process Process) { }() } -func GetContainerEventSubject(id string) string { - return fmt.Sprintf(containerEventsSubjectFormat, id) +func GetContainerEventTopic(id string) string { + return fmt.Sprintf(containerEventsTopicFormat, id) } -func GetContainerProcessEventSubject(containerID, processID string) string { - return fmt.Sprintf(containerProcessEventsSubjectFormat, containerID, processID) +func GetContainerProcessEventTopic(containerID, processID string) string { + return fmt.Sprintf(containerProcessEventsTopicFormat, containerID, processID) } func toGRPCContainer(container *Container) *api.Container { diff --git a/log/context.go b/log/context.go index 10af06f00..1081719c1 100644 --- a/log/context.go +++ b/log/context.go @@ -4,7 +4,7 @@ import ( "context" "path" - "github.com/Sirupsen/logrus" + "github.com/sirupsen/logrus" ) var (