From d50e25360c8ed988a4e5af1f492d249c9315f8ed Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 28 Aug 2018 10:37:04 -0400 Subject: [PATCH] Add context cancel for epoll Signed-off-by: Michael Crosby --- runtime/v2/runc/epoll.go | 22 ++++++++++++++-------- runtime/v2/runc/service.go | 12 ++++++++++-- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go index e57ec7b4c..6aea9b8c1 100644 --- a/runtime/v2/runc/epoll.go +++ b/runtime/v2/runc/epoll.go @@ -62,15 +62,21 @@ func (e *epoller) Close() error { func (e *epoller) run(ctx context.Context) { var events [128]unix.EpollEvent for { - n, err := unix.EpollWait(e.fd, events[:], -1) - if err != nil { - if err == unix.EINTR { - continue + select { + case <-ctx.Done(): + e.Close() + return + default: + n, err := unix.EpollWait(e.fd, events[:], -1) + if err != nil { + if err == unix.EINTR { + continue + } + logrus.WithError(err).Error("cgroups: epoll wait") + } + for i := 0; i < n; i++ { + e.process(ctx, uintptr(events[i].Fd)) } - logrus.WithError(err).Error("cgroups: epoll wait") - } - for i := 0; i < n; i++ { - e.process(ctx, uintptr(events[i].Fd)) } } } diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/service.go index 55dbeb069..3800b327a 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/service.go @@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, if err != nil { return nil, err } + ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, @@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, events: make(chan interface{}, 128), ec: shim.Default.Subscribe(), ep: ep, + cancel: cancel, } go s.processExits() runcC.Monitor = shim.Default if err := s.initPlatform(); err != nil { + cancel() return nil, errors.Wrap(err, "failed to initialized platform behavior") } go s.forward(publisher) @@ -101,6 +104,7 @@ type service struct { id string bundle string cg cgroups.Cgroup + cancel func() } func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { @@ -579,6 +583,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.cancel() os.Exit(0) return empty, nil } @@ -698,7 +703,10 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *service) forward(publisher events.Publisher) { for e := range s.events { - if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { + ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + err := publisher.Publish(ctx, getTopic(e), e) + cancel() + if err != nil { logrus.WithError(err).Error("post event") } } @@ -722,7 +730,7 @@ func (s *service) setCgroup(cg cgroups.Cgroup) { } } -func getTopic(ctx context.Context, e interface{}) string { +func getTopic(e interface{}) string { switch e.(type) { case *eventstypes.TaskCreate: return runtime.TaskCreateEventTopic