dependencies: update to gomega v1.22.1 and ginkgo v2.3.1

This adds support for timeouts and intermediate reports in Eventually and
Consistently.
This commit is contained in:
Patrick Ohly
2022-10-07 17:19:09 +02:00
parent 48608cfe60
commit d1dbf7ae3e
57 changed files with 2043 additions and 937 deletions

View File

@@ -1,7 +1,6 @@
package interrupt_handler
import (
"fmt"
"os"
"os/signal"
"sync"
@@ -11,27 +10,29 @@ import (
"github.com/onsi/ginkgo/v2/internal/parallel_support"
)
const TIMEOUT_REPEAT_INTERRUPT_MAXIMUM_DURATION = 30 * time.Second
const TIMEOUT_REPEAT_INTERRUPT_FRACTION_OF_TIMEOUT = 10
const ABORT_POLLING_INTERVAL = 500 * time.Millisecond
const ABORT_REPEAT_INTERRUPT_DURATION = 30 * time.Second
type InterruptCause uint
const (
InterruptCauseInvalid InterruptCause = iota
InterruptCauseSignal
InterruptCauseTimeout
InterruptCauseAbortByOtherProcess
)
type InterruptLevel uint
const (
InterruptLevelUninterrupted InterruptLevel = iota
InterruptLevelCleanupAndReport
InterruptLevelReportOnly
InterruptLevelBailOut
)
func (ic InterruptCause) String() string {
switch ic {
case InterruptCauseSignal:
return "Interrupted by User"
case InterruptCauseTimeout:
return "Interrupted by Timeout"
case InterruptCauseAbortByOtherProcess:
return "Interrupted by Other Ginkgo Process"
}
@@ -39,37 +40,49 @@ func (ic InterruptCause) String() string {
}
type InterruptStatus struct {
Interrupted bool
Channel chan interface{}
Cause InterruptCause
Channel chan interface{}
Level InterruptLevel
Cause InterruptCause
}
func (s InterruptStatus) Interrupted() bool {
return s.Level != InterruptLevelUninterrupted
}
func (s InterruptStatus) Message() string {
return s.Cause.String()
}
func (s InterruptStatus) ShouldIncludeProgressReport() bool {
return s.Cause != InterruptCauseAbortByOtherProcess
}
type InterruptHandlerInterface interface {
Status() InterruptStatus
SetInterruptPlaceholderMessage(string)
ClearInterruptPlaceholderMessage()
InterruptMessage() (string, bool)
}
type InterruptHandler struct {
c chan interface{}
lock *sync.Mutex
interrupted bool
interruptPlaceholderMessage string
interruptCause InterruptCause
client parallel_support.Client
stop chan interface{}
c chan interface{}
lock *sync.Mutex
level InterruptLevel
cause InterruptCause
client parallel_support.Client
stop chan interface{}
signals []os.Signal
}
func NewInterruptHandler(timeout time.Duration, client parallel_support.Client) *InterruptHandler {
handler := &InterruptHandler{
c: make(chan interface{}),
lock: &sync.Mutex{},
interrupted: false,
stop: make(chan interface{}),
client: client,
func NewInterruptHandler(client parallel_support.Client, signals ...os.Signal) *InterruptHandler {
if len(signals) == 0 {
signals = []os.Signal{os.Interrupt, syscall.SIGTERM}
}
handler.registerForInterrupts(timeout)
handler := &InterruptHandler{
c: make(chan interface{}),
lock: &sync.Mutex{},
stop: make(chan interface{}),
client: client,
signals: signals,
}
handler.registerForInterrupts()
return handler
}
@@ -77,30 +90,22 @@ func (handler *InterruptHandler) Stop() {
close(handler.stop)
}
func (handler *InterruptHandler) registerForInterrupts(timeout time.Duration) {
func (handler *InterruptHandler) registerForInterrupts() {
// os signal handling
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
// timeout handling
var timeoutChannel <-chan time.Time
var timeoutTimer *time.Timer
if timeout > 0 {
timeoutTimer = time.NewTimer(timeout)
timeoutChannel = timeoutTimer.C
}
signal.Notify(signalChannel, handler.signals...)
// cross-process abort handling
var abortChannel chan bool
var abortChannel chan interface{}
if handler.client != nil {
abortChannel = make(chan bool)
abortChannel = make(chan interface{})
go func() {
pollTicker := time.NewTicker(ABORT_POLLING_INTERVAL)
for {
select {
case <-pollTicker.C:
if handler.client.ShouldAbort() {
abortChannel <- true
close(abortChannel)
pollTicker.Stop()
return
}
@@ -112,55 +117,37 @@ func (handler *InterruptHandler) registerForInterrupts(timeout time.Duration) {
}()
}
// listen for any interrupt signals
// note that some (timeouts, cross-process aborts) will only trigger once
// for these we set up a ticker to keep interrupting the suite until it ends
// this ensures any `AfterEach` or `AfterSuite`s that get stuck cleaning up
// get interrupted eventually
go func() {
go func(abortChannel chan interface{}) {
var interruptCause InterruptCause
var repeatChannel <-chan time.Time
var repeatTicker *time.Ticker
for {
select {
case <-signalChannel:
interruptCause = InterruptCauseSignal
case <-timeoutChannel:
interruptCause = InterruptCauseTimeout
repeatInterruptTimeout := timeout / time.Duration(TIMEOUT_REPEAT_INTERRUPT_FRACTION_OF_TIMEOUT)
if repeatInterruptTimeout > TIMEOUT_REPEAT_INTERRUPT_MAXIMUM_DURATION {
repeatInterruptTimeout = TIMEOUT_REPEAT_INTERRUPT_MAXIMUM_DURATION
}
timeoutTimer.Stop()
repeatTicker = time.NewTicker(repeatInterruptTimeout)
repeatChannel = repeatTicker.C
case <-abortChannel:
interruptCause = InterruptCauseAbortByOtherProcess
repeatTicker = time.NewTicker(ABORT_REPEAT_INTERRUPT_DURATION)
repeatChannel = repeatTicker.C
case <-repeatChannel:
//do nothing, just interrupt again using the same interruptCause
case <-handler.stop:
if timeoutTimer != nil {
timeoutTimer.Stop()
}
if repeatTicker != nil {
repeatTicker.Stop()
}
signal.Stop(signalChannel)
return
}
abortChannel = nil
handler.lock.Lock()
handler.interruptCause = interruptCause
if handler.interruptPlaceholderMessage != "" {
fmt.Println(handler.interruptPlaceholderMessage)
oldLevel := handler.level
handler.cause = interruptCause
if handler.level == InterruptLevelUninterrupted {
handler.level = InterruptLevelCleanupAndReport
} else if handler.level == InterruptLevelCleanupAndReport {
handler.level = InterruptLevelReportOnly
} else if handler.level == InterruptLevelReportOnly {
handler.level = InterruptLevelBailOut
}
if handler.level != oldLevel {
close(handler.c)
handler.c = make(chan interface{})
}
handler.interrupted = true
close(handler.c)
handler.c = make(chan interface{})
handler.lock.Unlock()
}
}()
}(abortChannel)
}
func (handler *InterruptHandler) Status() InterruptStatus {
@@ -168,29 +155,8 @@ func (handler *InterruptHandler) Status() InterruptStatus {
defer handler.lock.Unlock()
return InterruptStatus{
Interrupted: handler.interrupted,
Channel: handler.c,
Cause: handler.interruptCause,
Level: handler.level,
Channel: handler.c,
Cause: handler.cause,
}
}
func (handler *InterruptHandler) SetInterruptPlaceholderMessage(message string) {
handler.lock.Lock()
defer handler.lock.Unlock()
handler.interruptPlaceholderMessage = message
}
func (handler *InterruptHandler) ClearInterruptPlaceholderMessage() {
handler.lock.Lock()
defer handler.lock.Unlock()
handler.interruptPlaceholderMessage = ""
}
func (handler *InterruptHandler) InterruptMessage() (string, bool) {
handler.lock.Lock()
out := fmt.Sprintf("%s", handler.interruptCause.String())
defer handler.lock.Unlock()
return out, handler.interruptCause != InterruptCauseAbortByOtherProcess
}