diff --git a/pkg/server/events.go b/pkg/server/events.go index 46eb68f49..6837acf0a 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "sync" "time" eventtypes "github.com/containerd/containerd/api/events" @@ -56,7 +57,9 @@ type eventMonitor struct { } type backOff struct { - queuePool map[string]*backOffQueue + queuePool map[string]*backOffQueue + // tickerMu is mutex used to protect the ticker. + tickerMu sync.Mutex ticker *time.Ticker minDuration time.Duration maxDuration time.Duration @@ -120,8 +123,8 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { return nil, errors.New("event channel is nil") } closeCh := make(chan struct{}) + backOffCheckCh := em.backOff.start() go func() { - backOffCheckCh := em.backOff.start() for { select { case e := <-em.ch: @@ -366,12 +369,18 @@ func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.D } func (b *backOff) start() <-chan time.Time { + b.tickerMu.Lock() + defer b.tickerMu.Unlock() b.ticker = time.NewTicker(b.checkDuration) return b.ticker.C } func (b *backOff) stop() { - b.ticker.Stop() + b.tickerMu.Lock() + defer b.tickerMu.Unlock() + if b.ticker != nil { + b.ticker.Stop() + } } func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {