221 lines
6.6 KiB
Go
221 lines
6.6 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package utils
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/kubernetes/test/e2e/framework"
|
|
)
|
|
|
|
// LogChecker is an interface for an entity that can check whether logging
|
|
// backend contains all wanted log entries.
|
|
type LogChecker interface {
|
|
EntriesIngested() (bool, error)
|
|
Timeout() error
|
|
}
|
|
|
|
// IngestionPred is a type of a function that checks whether all required
|
|
// log entries were ingested.
|
|
type IngestionPred func(string, []LogEntry) (bool, error)
|
|
|
|
// UntilFirstEntry is a IngestionPred that checks that at least one entry was
|
|
// ingested.
|
|
var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
|
|
return len(entries) > 0, nil
|
|
}
|
|
|
|
// UntilFirstEntryFromLog is a IngestionPred that checks that at least one
|
|
// entry from the log with a given name was ingested.
|
|
func UntilFirstEntryFromLog(log string) IngestionPred {
|
|
return func(_ string, entries []LogEntry) (bool, error) {
|
|
for _, e := range entries {
|
|
if e.LogName == log {
|
|
if e.Location != framework.TestContext.CloudConfig.Zone {
|
|
return false, fmt.Errorf("Bad location in logs '%s' != '%d'", e.Location, framework.TestContext.CloudConfig.Zone)
|
|
}
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// TimeoutFun is a function that is called when the waiting times out.
|
|
type TimeoutFun func([]string, []bool) error
|
|
|
|
// JustTimeout returns the error with the list of names for which backend is
|
|
// still still missing logs.
|
|
var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
|
|
failedNames := []string{}
|
|
for i, name := range names {
|
|
if !ingested[i] {
|
|
failedNames = append(failedNames, name)
|
|
}
|
|
}
|
|
return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
|
|
strings.Join(failedNames, ","))
|
|
}
|
|
|
|
var _ LogChecker = &logChecker{}
|
|
|
|
type logChecker struct {
|
|
provider LogProvider
|
|
names []string
|
|
ingested []bool
|
|
ingestionPred IngestionPred
|
|
timeoutFun TimeoutFun
|
|
}
|
|
|
|
// NewLogChecker constructs a LogChecker for a list of names from custom
|
|
// IngestionPred and TimeoutFun.
|
|
func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
|
|
return &logChecker{
|
|
provider: p,
|
|
names: names,
|
|
ingested: make([]bool, len(names)),
|
|
ingestionPred: pred,
|
|
timeoutFun: timeout,
|
|
}
|
|
}
|
|
|
|
func (c *logChecker) EntriesIngested() (bool, error) {
|
|
allIngested := true
|
|
for i, name := range c.names {
|
|
if c.ingested[i] {
|
|
continue
|
|
}
|
|
entries := c.provider.ReadEntries(name)
|
|
ingested, err := c.ingestionPred(name, entries)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if ingested {
|
|
c.ingested[i] = true
|
|
}
|
|
allIngested = allIngested && ingested
|
|
}
|
|
return allIngested, nil
|
|
}
|
|
|
|
func (c *logChecker) Timeout() error {
|
|
return c.timeoutFun(c.names, c.ingested)
|
|
}
|
|
|
|
// NumberedIngestionPred is a IngestionPred that takes into account sequential
|
|
// numbers of ingested entries.
|
|
type NumberedIngestionPred func(string, map[int]bool) (bool, error)
|
|
|
|
// NumberedTimeoutFun is a TimeoutFun that takes into account sequential
|
|
// numbers of ingested entries.
|
|
type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
|
|
|
|
// NewNumberedLogChecker returns a log checker that works with numbered log
|
|
// entries generated by load logging pods.
|
|
func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
|
|
timeout NumberedTimeoutFun, names ...string) LogChecker {
|
|
occs := map[string]map[int]bool{}
|
|
return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
|
|
occ, ok := occs[name]
|
|
if !ok {
|
|
occ = map[int]bool{}
|
|
occs[name] = occ
|
|
}
|
|
for _, entry := range entries {
|
|
if no, ok := entry.TryGetEntryNumber(); ok {
|
|
occ[no] = true
|
|
}
|
|
}
|
|
return pred(name, occ)
|
|
}, func(names []string, _ []bool) error {
|
|
return timeout(names, occs)
|
|
}, names...)
|
|
}
|
|
|
|
// NewFullIngestionPodLogChecker returns a log checks that works with numbered
|
|
// log entries generated by load logging pods and waits until all entries are
|
|
// ingested. If timeout is reached, fraction is lost logs up to slack is
|
|
// considered tolerable.
|
|
func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
|
|
podsMap := map[string]FiniteLoggingPod{}
|
|
for _, p := range pods {
|
|
podsMap[p.Name()] = p
|
|
}
|
|
return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
|
|
getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
|
|
}
|
|
|
|
func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
|
|
return func(name string, occ map[int]bool) (bool, error) {
|
|
p := podsMap[name]
|
|
ok := len(occ) == p.ExpectedLineCount()
|
|
return ok, nil
|
|
}
|
|
}
|
|
|
|
func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
|
|
return func(names []string, occs map[string]map[int]bool) error {
|
|
totalGot, totalWant := 0, 0
|
|
lossMsgs := []string{}
|
|
for _, name := range names {
|
|
got := len(occs[name])
|
|
want := podsMap[name].ExpectedLineCount()
|
|
if got != want {
|
|
lossMsg := fmt.Sprintf("%s: %d lines", name, want-got)
|
|
lossMsgs = append(lossMsgs, lossMsg)
|
|
}
|
|
totalGot += got
|
|
totalWant += want
|
|
}
|
|
if len(lossMsgs) > 0 {
|
|
framework.Logf("Still missing logs from:\n%s", strings.Join(lossMsgs, "\n"))
|
|
}
|
|
lostFrac := 1 - float64(totalGot)/float64(totalWant)
|
|
if lostFrac > slack {
|
|
return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
|
|
lostFrac*100, slack*100)
|
|
}
|
|
framework.Logf("Missing %.2f%% of logs, which is lower than the threshold %.2f%%",
|
|
lostFrac*100, slack*100)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WaitForLogs checks that logs are ingested, as reported by the log checker
|
|
// until the timeout has passed. Function sleeps for interval between two
|
|
// log ingestion checks.
|
|
func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
|
|
err := wait.Poll(interval, timeout, func() (bool, error) {
|
|
return c.EntriesIngested()
|
|
})
|
|
if err == wait.ErrWaitTimeout {
|
|
return c.Timeout()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
|
|
names := []string{}
|
|
for _, p := range pods {
|
|
names = append(names, p.Name())
|
|
}
|
|
return names
|
|
}
|