simplify pluginwatcher closing
This commit is contained in:
		| @@ -39,10 +39,10 @@ import ( | ||||
| type Watcher struct { | ||||
| 	path           string | ||||
| 	deprecatedPath string | ||||
| 	stopCh         chan interface{} | ||||
| 	stopCh         chan struct{} | ||||
| 	stopped        chan struct{} | ||||
| 	fs             utilfs.Filesystem | ||||
| 	fsWatcher      *fsnotify.Watcher | ||||
| 	wg             sync.WaitGroup | ||||
|  | ||||
| 	mutex       sync.Mutex | ||||
| 	handlers    map[string]PluginHandler | ||||
| @@ -88,7 +88,8 @@ func (w *Watcher) getHandler(pluginType string) (PluginHandler, bool) { | ||||
| // Start watches for the creation of plugin sockets at the path | ||||
| func (w *Watcher) Start() error { | ||||
| 	klog.V(2).Infof("Plugin Watcher Start at %s", w.path) | ||||
| 	w.stopCh = make(chan interface{}) | ||||
| 	w.stopCh = make(chan struct{}) | ||||
| 	w.stopped = make(chan struct{}) | ||||
|  | ||||
| 	// Creating the directory to be watched if it doesn't exist yet, | ||||
| 	// and walks through the directory to discover the existing plugins. | ||||
| @@ -104,22 +105,20 @@ func (w *Watcher) Start() error { | ||||
|  | ||||
| 	// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine. | ||||
| 	if err := w.traversePluginDir(w.path); err != nil { | ||||
| 		w.Stop() | ||||
| 		w.fsWatcher.Close() | ||||
| 		return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) | ||||
| 	} | ||||
|  | ||||
| 	// Traverse deprecated plugin dir, if specified. | ||||
| 	if len(w.deprecatedPath) != 0 { | ||||
| 		if err := w.traversePluginDir(w.deprecatedPath); err != nil { | ||||
| 			w.Stop() | ||||
| 			w.fsWatcher.Close() | ||||
| 			return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	w.wg.Add(1) | ||||
| 	go func(fsWatcher *fsnotify.Watcher) { | ||||
| 		defer w.wg.Done() | ||||
|  | ||||
| 	go func() { | ||||
| 		defer close(w.stopped) | ||||
| 		for { | ||||
| 			select { | ||||
| 			case event := <-fsWatcher.Events: | ||||
| @@ -135,17 +134,15 @@ func (w *Watcher) Start() error { | ||||
| 						klog.Errorf("error %v when handling delete event: %s", err, event) | ||||
| 					} | ||||
| 				} | ||||
| 				continue | ||||
| 			case err := <-fsWatcher.Errors: | ||||
| 				if err != nil { | ||||
| 					klog.Errorf("fsWatcher received error: %v", err) | ||||
| 				} | ||||
| 				continue | ||||
| 			case <-w.stopCh: | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}(fsWatcher) | ||||
| 	}() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -154,18 +151,9 @@ func (w *Watcher) Start() error { | ||||
| func (w *Watcher) Stop() error { | ||||
| 	close(w.stopCh) | ||||
|  | ||||
| 	c := make(chan struct{}) | ||||
| 	var once sync.Once | ||||
| 	closeFunc := func() { close(c) } | ||||
| 	go func() { | ||||
| 		defer once.Do(closeFunc) | ||||
| 		w.wg.Wait() | ||||
| 	}() | ||||
|  | ||||
| 	select { | ||||
| 	case <-c: | ||||
| 	case <-w.stopped: | ||||
| 	case <-time.After(11 * time.Second): | ||||
| 		once.Do(closeFunc) | ||||
| 		return fmt.Errorf("timeout on stopping watcher") | ||||
| 	} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Mike Danese
					Mike Danese