164 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			164 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package eventq
 | |
| 
 | |
| import (
 | |
| 	"io"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| type EventQueue[T any] struct {
 | |
| 	events      chan<- T
 | |
| 	subscriberC chan<- eventSubscription[T]
 | |
| 	shutdownC   chan struct{}
 | |
| }
 | |
| 
 | |
| type eventSubscription[T any] struct {
 | |
| 	c      chan<- T
 | |
| 	closeC chan struct{}
 | |
| }
 | |
| 
 | |
| func (sub eventSubscription[T]) publish(event T) bool {
 | |
| 	select {
 | |
| 	case <-sub.closeC:
 | |
| 		return false
 | |
| 	case sub.c <- event:
 | |
| 		return true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (sub eventSubscription[T]) Close() error {
 | |
| 	select {
 | |
| 	case <-sub.closeC:
 | |
| 	default:
 | |
| 		close(sub.closeC)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // New provides a queue for sending messages to one or more
 | |
| // subscribers. Messages are held for the given discardAfter duration
 | |
| // if there are no subscribers.
 | |
| func New[T any](discardAfter time.Duration, discardFn func(T)) EventQueue[T] {
 | |
| 	events := make(chan T)
 | |
| 	subscriberC := make(chan eventSubscription[T])
 | |
| 	shutdownC := make(chan struct{})
 | |
| 
 | |
| 	go func() {
 | |
| 		type queuedEvent struct {
 | |
| 			event     T
 | |
| 			discardAt time.Time
 | |
| 		}
 | |
| 
 | |
| 		var discardQueue []queuedEvent
 | |
| 		var discardTime <-chan time.Time
 | |
| 		var subscribers []eventSubscription[T]
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-shutdownC:
 | |
| 				for _, event := range discardQueue {
 | |
| 					discardFn(event.event)
 | |
| 				}
 | |
| 				for _, sub := range subscribers {
 | |
| 					close(sub.c)
 | |
| 				}
 | |
| 				return
 | |
| 			case event := <-events:
 | |
| 				if len(subscribers) > 0 {
 | |
| 					active := subscribers[:0]
 | |
| 					for _, sub := range subscribers {
 | |
| 						if sub.publish(event) {
 | |
| 							active = append(active, sub)
 | |
| 						}
 | |
| 					}
 | |
| 					subscribers = active
 | |
| 				}
 | |
| 				if len(subscribers) == 0 {
 | |
| 					discardQueue = append(discardQueue, queuedEvent{
 | |
| 						event:     event,
 | |
| 						discardAt: time.Now().Add(discardAfter),
 | |
| 					})
 | |
| 					if discardTime == nil {
 | |
| 						discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs())
 | |
| 					}
 | |
| 				}
 | |
| 			case s := <-subscriberC:
 | |
| 				var closed bool
 | |
| 				for i, event := range discardQueue {
 | |
| 					if !s.publish(event.event) {
 | |
| 						discardQueue = discardQueue[i:]
 | |
| 						closed = true
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 				if !closed {
 | |
| 					discardQueue = nil
 | |
| 					discardTime = nil
 | |
| 					subscribers = append(subscribers, s)
 | |
| 				}
 | |
| 			case t := <-discardTime:
 | |
| 				toDiscard := discardQueue
 | |
| 				discardQueue = nil
 | |
| 				for i, event := range toDiscard {
 | |
| 					if t.After(event.discardAt) {
 | |
| 						discardFn(event.event)
 | |
| 					} else {
 | |
| 						discardQueue = toDiscard[i:]
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 				if len(discardQueue) == 0 {
 | |
| 					discardTime = nil
 | |
| 				} else {
 | |
| 					// Wait until next item in discard queue plus a small buffer to collect a burst of events
 | |
| 					discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs() + 10*time.Millisecond)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return EventQueue[T]{
 | |
| 		events:      events,
 | |
| 		subscriberC: subscriberC,
 | |
| 		shutdownC:   shutdownC,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (eq *EventQueue[T]) Shutdown() {
 | |
| 	defer close(eq.shutdownC)
 | |
| 	eq.shutdownC <- struct{}{}
 | |
| }
 | |
| 
 | |
| func (eq *EventQueue[T]) Send(event T) {
 | |
| 	select {
 | |
| 	case <-eq.shutdownC:
 | |
| 	case eq.events <- event:
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (eq *EventQueue[T]) Subscribe() (<-chan T, io.Closer) {
 | |
| 	c := make(chan T, 100)
 | |
| 	subscription := eventSubscription[T]{
 | |
| 		c:      c,
 | |
| 		closeC: make(chan struct{}),
 | |
| 	}
 | |
| 	eq.subscriberC <- subscription
 | |
| 
 | |
| 	return c, subscription
 | |
| }
 | 
