Add context cancel for epoll

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-08-28 10:37:04 -04:00
parent d89ba5ee08
commit d50e25360c
2 changed files with 24 additions and 10 deletions

View File

@ -62,15 +62,21 @@ func (e *epoller) Close() error {
func (e *epoller) run(ctx context.Context) { func (e *epoller) run(ctx context.Context) {
var events [128]unix.EpollEvent var events [128]unix.EpollEvent
for { for {
n, err := unix.EpollWait(e.fd, events[:], -1) select {
if err != nil { case <-ctx.Done():
if err == unix.EINTR { e.Close()
continue return
default:
n, err := unix.EpollWait(e.fd, events[:], -1)
if err != nil {
if err == unix.EINTR {
continue
}
logrus.WithError(err).Error("cgroups: epoll wait")
}
for i := 0; i < n; i++ {
e.process(ctx, uintptr(events[i].Fd))
} }
logrus.WithError(err).Error("cgroups: epoll wait")
}
for i := 0; i < n; i++ {
e.process(ctx, uintptr(events[i].Fd))
} }
} }
} }

View File

@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(ctx)
go ep.run(ctx) go ep.run(ctx)
s := &service{ s := &service{
id: id, id: id,
@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(), ec: shim.Default.Subscribe(),
ep: ep, ep: ep,
cancel: cancel,
} }
go s.processExits() go s.processExits()
runcC.Monitor = shim.Default runcC.Monitor = shim.Default
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
cancel()
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")
} }
go s.forward(publisher) go s.forward(publisher)
@ -101,6 +104,7 @@ type service struct {
id string id string
bundle string bundle string
cg cgroups.Cgroup cg cgroups.Cgroup
cancel func()
} }
func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
@ -579,6 +583,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
} }
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.cancel()
os.Exit(0) os.Exit(0)
return empty, nil return empty, nil
} }
@ -698,7 +703,10 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
func (s *service) forward(publisher events.Publisher) { func (s *service) forward(publisher events.Publisher) {
for e := range s.events { for e := range s.events {
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
logrus.WithError(err).Error("post event") logrus.WithError(err).Error("post event")
} }
} }
@ -722,7 +730,7 @@ func (s *service) setCgroup(cg cgroups.Cgroup) {
} }
} }
func getTopic(ctx context.Context, e interface{}) string { func getTopic(e interface{}) string {
switch e.(type) { switch e.(type) {
case *eventstypes.TaskCreate: case *eventstypes.TaskCreate:
return runtime.TaskCreateEventTopic return runtime.TaskCreateEventTopic