kubelet: upgrade sourceFile to use fsnotify
Mitigate some flakes for deleted watch directories and use the maintained fsnotify package.
This commit is contained in:
@@ -26,7 +26,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sigma/go-inotify"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@@ -77,30 +77,30 @@ func (s *sourceFile) doWatch() error {
|
||||
return &retryableError{"path does not exist, ignoring"}
|
||||
}
|
||||
|
||||
w, err := inotify.NewWatcher()
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create inotify: %v", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE|inotify.IN_ATTRIB)
|
||||
err = w.Add(s.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-w.Event:
|
||||
if err = s.produceWatchEvent(event); err != nil {
|
||||
case event := <-w.Events:
|
||||
if err = s.produceWatchEvent(&event); err != nil {
|
||||
return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
|
||||
}
|
||||
case err = <-w.Error:
|
||||
case err = <-w.Errors:
|
||||
return fmt.Errorf("error while watching %q: %v", s.path, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sourceFile) produceWatchEvent(e *inotify.Event) error {
|
||||
func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
|
||||
// Ignore file start with dots
|
||||
if strings.HasPrefix(filepath.Base(e.Name), ".") {
|
||||
klog.V(4).Infof("Ignored pod manifest: %s, because it starts with dots", e.Name)
|
||||
@@ -108,23 +108,16 @@ func (s *sourceFile) produceWatchEvent(e *inotify.Event) error {
|
||||
}
|
||||
var eventType podEventType
|
||||
switch {
|
||||
case (e.Mask & inotify.IN_ISDIR) > 0:
|
||||
klog.Errorf("Not recursing into manifest path %q", s.path)
|
||||
return nil
|
||||
case (e.Mask & inotify.IN_CREATE) > 0:
|
||||
case (e.Op & fsnotify.Create) > 0:
|
||||
eventType = podAdd
|
||||
case (e.Mask & inotify.IN_MOVED_TO) > 0:
|
||||
eventType = podAdd
|
||||
case (e.Mask & inotify.IN_MODIFY) > 0:
|
||||
case (e.Op & fsnotify.Write) > 0:
|
||||
eventType = podModify
|
||||
case (e.Mask & inotify.IN_ATTRIB) > 0:
|
||||
case (e.Op & fsnotify.Chmod) > 0:
|
||||
eventType = podModify
|
||||
case (e.Mask & inotify.IN_DELETE) > 0:
|
||||
case (e.Op & fsnotify.Remove) > 0:
|
||||
eventType = podDelete
|
||||
case (e.Mask & inotify.IN_MOVED_FROM) > 0:
|
||||
case (e.Op & fsnotify.Rename) > 0:
|
||||
eventType = podDelete
|
||||
case (e.Mask & inotify.IN_DELETE_SELF) > 0:
|
||||
return fmt.Errorf("the watched path is deleted")
|
||||
default:
|
||||
// Ignore rest events
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user