Make sure at least one interrupt is buffered before dropping.
This commit is contained in:
@@ -82,7 +82,11 @@ type EndpointsConfig struct {
|
|||||||
// NewEndpointsConfig creates a new EndpointsConfig.
|
// NewEndpointsConfig creates a new EndpointsConfig.
|
||||||
// It immediately runs the created EndpointsConfig.
|
// It immediately runs the created EndpointsConfig.
|
||||||
func NewEndpointsConfig() *EndpointsConfig {
|
func NewEndpointsConfig() *EndpointsConfig {
|
||||||
updates := make(chan struct{})
|
// The updates channel is used to send interrupts to the Endpoints handler.
|
||||||
|
// It's buffered because we never want to block for as long as there is a
|
||||||
|
// pending interrupt, but don't want to drop them if the handler is doing
|
||||||
|
// work.
|
||||||
|
updates := make(chan struct{}, 1)
|
||||||
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
|
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
|
||||||
mux := config.NewMux(store)
|
mux := config.NewMux(store)
|
||||||
bcaster := config.NewBroadcaster()
|
bcaster := config.NewBroadcaster()
|
||||||
@@ -187,7 +191,11 @@ type ServiceConfig struct {
|
|||||||
// NewServiceConfig creates a new ServiceConfig.
|
// NewServiceConfig creates a new ServiceConfig.
|
||||||
// It immediately runs the created ServiceConfig.
|
// It immediately runs the created ServiceConfig.
|
||||||
func NewServiceConfig() *ServiceConfig {
|
func NewServiceConfig() *ServiceConfig {
|
||||||
updates := make(chan struct{})
|
// The updates channel is used to send interrupts to the Services handler.
|
||||||
|
// It's buffered because we never want to block for as long as there is a
|
||||||
|
// pending interrupt, but don't want to drop them if the handler is doing
|
||||||
|
// work.
|
||||||
|
updates := make(chan struct{}, 1)
|
||||||
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
|
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
|
||||||
mux := config.NewMux(store)
|
mux := config.NewMux(store)
|
||||||
bcaster := config.NewBroadcaster()
|
bcaster := config.NewBroadcaster()
|
||||||
|
@@ -348,3 +348,8 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
|
|||||||
handler.ValidateEndpoints(t, endpoints)
|
handler.ValidateEndpoints(t, endpoints)
|
||||||
handler2.ValidateEndpoints(t, endpoints)
|
handler2.ValidateEndpoints(t, endpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Add a unittest for interrupts getting processed in a timely manner.
|
||||||
|
// Currently this module has a circular dependency with config, and so it's
|
||||||
|
// named config_test, which means even test methods need to be public. This
|
||||||
|
// is refactoring that we can avoid by resolving the dependency.
|
||||||
|
Reference in New Issue
Block a user