Add missing "/tasks/exec-started" event topic
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
parent
dbd3eff1e6
commit
3f34c421d3
@ -104,7 +104,12 @@ func main() {
|
|||||||
if err := serve(server, socket); err != nil {
|
if err := serve(server, socket); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return handleSignals(signals, server, sv)
|
logger := logrus.WithFields(logrus.Fields{
|
||||||
|
"pid": os.Getpid(),
|
||||||
|
"path": path,
|
||||||
|
"namespace": context.GlobalString("namespace"),
|
||||||
|
})
|
||||||
|
return handleSignals(logger, signals, server, sv)
|
||||||
}
|
}
|
||||||
if err := app.Run(os.Args); err != nil {
|
if err := app.Run(os.Args); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
|
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
|
||||||
@ -139,7 +144,7 @@ func serve(server *grpc.Server, path string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleSignals(signals chan os.Signal, server *grpc.Server, sv *shim.Service) error {
|
func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *grpc.Server, sv *shim.Service) error {
|
||||||
var (
|
var (
|
||||||
termOnce sync.Once
|
termOnce sync.Once
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
@ -153,7 +158,7 @@ func handleSignals(signals chan os.Signal, server *grpc.Server, sv *shim.Service
|
|||||||
switch s {
|
switch s {
|
||||||
case unix.SIGCHLD:
|
case unix.SIGCHLD:
|
||||||
if err := reaper.Reap(); err != nil {
|
if err := reaper.Reap(); err != nil {
|
||||||
logrus.WithError(err).Error("reap exit status")
|
logger.WithError(err).Error("reap exit status")
|
||||||
}
|
}
|
||||||
case unix.SIGTERM, unix.SIGINT:
|
case unix.SIGTERM, unix.SIGINT:
|
||||||
go termOnce.Do(func() {
|
go termOnce.Do(func() {
|
||||||
@ -167,13 +172,13 @@ func handleSignals(signals chan os.Signal, server *grpc.Server, sv *shim.Service
|
|||||||
close(done)
|
close(done)
|
||||||
})
|
})
|
||||||
case unix.SIGUSR1:
|
case unix.SIGUSR1:
|
||||||
dumpStacks()
|
dumpStacks(logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func dumpStacks() {
|
func dumpStacks(logger *logrus.Entry) {
|
||||||
var (
|
var (
|
||||||
buf []byte
|
buf []byte
|
||||||
stackSize int
|
stackSize int
|
||||||
@ -185,7 +190,7 @@ func dumpStacks() {
|
|||||||
bufferLen *= 2
|
bufferLen *= 2
|
||||||
}
|
}
|
||||||
buf = buf[:stackSize]
|
buf = buf[:stackSize]
|
||||||
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
|
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
||||||
|
@ -185,11 +185,11 @@ func validateTopic(topic string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if topic[0] != '/' {
|
if topic[0] != '/' {
|
||||||
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic)
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(topic) == 1 {
|
if len(topic) == 1 {
|
||||||
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic)
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component")
|
||||||
}
|
}
|
||||||
|
|
||||||
components := strings.Split(topic[1:], "/")
|
components := strings.Split(topic[1:], "/")
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,6 +36,11 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S
|
|||||||
return nil, fmt.Errorf("shim namespace cannot be empty")
|
return nil, fmt.Errorf("shim namespace cannot be empty")
|
||||||
}
|
}
|
||||||
context := namespaces.WithNamespace(context.Background(), namespace)
|
context := namespaces.WithNamespace(context.Background(), namespace)
|
||||||
|
context = log.WithLogger(context, logrus.WithFields(logrus.Fields{
|
||||||
|
"namespace": namespace,
|
||||||
|
"pid": os.Getpid(),
|
||||||
|
"path": path,
|
||||||
|
}))
|
||||||
s := &Service{
|
s := &Service{
|
||||||
path: path,
|
path: path,
|
||||||
processes: make(map[string]process),
|
processes: make(map[string]process),
|
||||||
@ -417,13 +423,13 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
|
|||||||
|
|
||||||
func (s *Service) forward(publisher events.Publisher) {
|
func (s *Service) forward(publisher events.Publisher) {
|
||||||
for e := range s.events {
|
for e := range s.events {
|
||||||
if err := publisher.Publish(s.context, getTopic(e), e); err != nil {
|
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
|
||||||
log.G(s.context).WithError(err).Error("post event")
|
log.G(s.context).WithError(err).Error("post event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTopic(e interface{}) string {
|
func getTopic(ctx context.Context, e interface{}) string {
|
||||||
switch e.(type) {
|
switch e.(type) {
|
||||||
case *eventsapi.TaskCreate:
|
case *eventsapi.TaskCreate:
|
||||||
return runtime.TaskCreateEventTopic
|
return runtime.TaskCreateEventTopic
|
||||||
@ -437,12 +443,16 @@ func getTopic(e interface{}) string {
|
|||||||
return runtime.TaskDeleteEventTopic
|
return runtime.TaskDeleteEventTopic
|
||||||
case *eventsapi.TaskExecAdded:
|
case *eventsapi.TaskExecAdded:
|
||||||
return runtime.TaskExecAddedEventTopic
|
return runtime.TaskExecAddedEventTopic
|
||||||
|
case *eventsapi.TaskExecStarted:
|
||||||
|
return runtime.TaskExecStartedEventTopic
|
||||||
case *eventsapi.TaskPaused:
|
case *eventsapi.TaskPaused:
|
||||||
return runtime.TaskPausedEventTopic
|
return runtime.TaskPausedEventTopic
|
||||||
case *eventsapi.TaskResumed:
|
case *eventsapi.TaskResumed:
|
||||||
return runtime.TaskResumedEventTopic
|
return runtime.TaskResumedEventTopic
|
||||||
case *eventsapi.TaskCheckpointed:
|
case *eventsapi.TaskCheckpointed:
|
||||||
return runtime.TaskCheckpointedEventTopic
|
return runtime.TaskCheckpointedEventTopic
|
||||||
|
default:
|
||||||
|
log.G(ctx).Warnf("no topic for type %#v", e)
|
||||||
}
|
}
|
||||||
return "?"
|
return runtime.TaskUnknownTopic
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,9 @@ const (
|
|||||||
TaskExitEventTopic = "/tasks/exit"
|
TaskExitEventTopic = "/tasks/exit"
|
||||||
TaskDeleteEventTopic = "/tasks/delete"
|
TaskDeleteEventTopic = "/tasks/delete"
|
||||||
TaskExecAddedEventTopic = "/tasks/exec-added"
|
TaskExecAddedEventTopic = "/tasks/exec-added"
|
||||||
|
TaskExecStartedEventTopic = "/tasks/exec-started"
|
||||||
TaskPausedEventTopic = "/tasks/paused"
|
TaskPausedEventTopic = "/tasks/paused"
|
||||||
TaskResumedEventTopic = "/tasks/resumed"
|
TaskResumedEventTopic = "/tasks/resumed"
|
||||||
TaskCheckpointedEventTopic = "/tasks/checkpointed"
|
TaskCheckpointedEventTopic = "/tasks/checkpointed"
|
||||||
|
TaskUnknownTopic = "/tasks/?"
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user