Improve Start functions
This commit is contained in:
@@ -17,9 +17,9 @@ limitations under the License.
|
|||||||
package wait
|
package wait
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@@ -37,12 +37,34 @@ var ForeverTestTimeout = time.Second * 30
|
|||||||
// NeverStop may be passed to Until to make it never stop.
|
// NeverStop may be passed to Until to make it never stop.
|
||||||
var NeverStop <-chan struct{} = make(chan struct{})
|
var NeverStop <-chan struct{} = make(chan struct{})
|
||||||
|
|
||||||
// StartUntil starts f in a new goroutine and calls done once f has finished.
|
// Group is an interface to decouple code from sync.WaitGroup.
|
||||||
func StartUntil(stopCh <-chan struct{}, wg *sync.WaitGroup, f func(stopCh <-chan struct{})) {
|
type Group interface {
|
||||||
wg.Add(1)
|
Add(delta int)
|
||||||
go func() {
|
Done()
|
||||||
defer wg.Done()
|
}
|
||||||
|
|
||||||
|
// StartWithChannelWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished.
|
||||||
|
// stopCh is passed to f as an argument. f should stop when stopCh is available.
|
||||||
|
func StartWithChannelWithinGroup(stopCh <-chan struct{}, g Group, f func(stopCh <-chan struct{})) {
|
||||||
|
StartWithinGroup(g, func() {
|
||||||
f(stopCh)
|
f(stopCh)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartWithContextWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished.
|
||||||
|
// ctx is passed to f as an argument. f should stop when ctx.Done() is available.
|
||||||
|
func StartWithContextWithinGroup(ctx context.Context, g Group, f func(context.Context)) {
|
||||||
|
StartWithinGroup(g, func() {
|
||||||
|
f(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished.
|
||||||
|
func StartWithinGroup(g Group, f func()) {
|
||||||
|
g.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer g.Done()
|
||||||
|
f()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -119,7 +119,7 @@ func (c *controller) Run(stopCh <-chan struct{}) {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer wg.Wait()
|
defer wg.Wait()
|
||||||
|
|
||||||
wait.StartUntil(stopCh, &wg, r.Run)
|
wait.StartWithChannelWithinGroup(stopCh, &wg, r.Run)
|
||||||
|
|
||||||
wait.Until(c.processLoop, time.Second, stopCh)
|
wait.Until(c.processLoop, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
@@ -211,8 +211,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||||||
|
|
||||||
defer s.wg.Wait()
|
defer s.wg.Wait()
|
||||||
|
|
||||||
wait.StartUntil(stopCh, &s.wg, s.cacheMutationDetector.Run)
|
wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.cacheMutationDetector.Run)
|
||||||
wait.StartUntil(stopCh, &s.wg, s.processor.run)
|
wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.processor.run)
|
||||||
s.controller.Run(stopCh)
|
s.controller.Run(stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -327,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
|
|||||||
|
|
||||||
s.processor.addListener(listener)
|
s.processor.addListener(listener)
|
||||||
|
|
||||||
wait.StartUntil(s.stopCh, &s.wg, listener.run)
|
wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.run)
|
||||||
wait.StartUntil(s.stopCh, &s.wg, listener.pop)
|
wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.pop)
|
||||||
|
|
||||||
items := s.indexer.List()
|
items := s.indexer.List()
|
||||||
for i := range items {
|
for i := range items {
|
||||||
@@ -403,8 +403,8 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
|||||||
p.listenersLock.RLock()
|
p.listenersLock.RLock()
|
||||||
defer p.listenersLock.RUnlock()
|
defer p.listenersLock.RUnlock()
|
||||||
for _, listener := range p.listeners {
|
for _, listener := range p.listeners {
|
||||||
wait.StartUntil(stopCh, &wg, listener.run)
|
wait.StartWithChannelWithinGroup(stopCh, &wg, listener.run)
|
||||||
wait.StartUntil(stopCh, &wg, listener.pop)
|
wait.StartWithChannelWithinGroup(stopCh, &wg, listener.pop)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
Reference in New Issue
Block a user