Take into account number of restarts in cluster logging tests
This commit is contained in:
@@ -208,6 +208,7 @@ go_library(
|
|||||||
"//vendor:k8s.io/client-go/tools/cache",
|
"//vendor:k8s.io/client-go/tools/cache",
|
||||||
"//vendor:k8s.io/client-go/transport",
|
"//vendor:k8s.io/client-go/transport",
|
||||||
"//vendor:k8s.io/client-go/util/flowcontrol",
|
"//vendor:k8s.io/client-go/util/flowcontrol",
|
||||||
|
"//vendor:k8s.io/client-go/util/integer",
|
||||||
"//vendor:k8s.io/client-go/util/workqueue",
|
"//vendor:k8s.io/client-go/util/workqueue",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@@ -51,11 +51,14 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
|
|||||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
||||||
|
|
||||||
By("Waiting for logs to ingest")
|
By("Waiting for logs to ingest")
|
||||||
err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
|
config := &loggingTestConfig{
|
||||||
framework.ExpectNoError(err, "Failed to ingest logs")
|
LogsProvider: esLogsProvider,
|
||||||
|
Pods: []*loggingPod{pod},
|
||||||
if err != nil {
|
IngestionTimeout: 10 * time.Minute,
|
||||||
reportLogsFromFluentdPod(f, pod)
|
MaxAllowedLostFraction: 0,
|
||||||
|
MaxAllowedFluentdRestarts: 0,
|
||||||
}
|
}
|
||||||
|
err = waitForLogsIngestion(f, config)
|
||||||
|
framework.ExpectNoError(err, "Failed to ingest logs")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@@ -46,6 +46,10 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
|
|||||||
return &esLogsProvider{Framework: f}, nil
|
return &esLogsProvider{Framework: f}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
|
||||||
|
return "fluentd-es"
|
||||||
|
}
|
||||||
|
|
||||||
// Ensures that elasticsearch is running and ready to serve requests
|
// Ensures that elasticsearch is running and ready to serve requests
|
||||||
func (logsProvider *esLogsProvider) EnsureWorking() error {
|
func (logsProvider *esLogsProvider) EnsureWorking() error {
|
||||||
f := logsProvider.Framework
|
f := logsProvider.Framework
|
||||||
|
@@ -50,11 +50,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Flaky]", func()
|
|||||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
|
||||||
|
|
||||||
By("Waiting for logs to ingest")
|
By("Waiting for logs to ingest")
|
||||||
err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
|
config := &loggingTestConfig{
|
||||||
framework.ExpectNoError(err, "Failed to ingest logs")
|
LogsProvider: gclLogsProvider,
|
||||||
|
Pods: []*loggingPod{pod},
|
||||||
if err != nil {
|
IngestionTimeout: 10 * time.Minute,
|
||||||
reportLogsFromFluentdPod(f, pod)
|
MaxAllowedLostFraction: 0,
|
||||||
|
MaxAllowedFluentdRestarts: 0,
|
||||||
}
|
}
|
||||||
|
err = waitForLogsIngestion(f, config)
|
||||||
|
framework.ExpectNoError(err, "Failed to ingest logs")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@@ -29,6 +29,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
// TODO(crassirostris): Once test is stable, decrease allowed loses
|
// TODO(crassirostris): Once test is stable, decrease allowed loses
|
||||||
loadTestMaxAllowedLostFraction = 0.1
|
loadTestMaxAllowedLostFraction = 0.1
|
||||||
|
loadTestMaxAllowedFluentdRestarts = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(crassirostris): Remove Flaky once test is stable
|
// TODO(crassirostris): Remove Flaky once test is stable
|
||||||
@@ -58,7 +59,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]",
|
|||||||
time.Sleep(loggingDuration)
|
time.Sleep(loggingDuration)
|
||||||
|
|
||||||
By("Waiting for all log lines to be ingested")
|
By("Waiting for all log lines to be ingested")
|
||||||
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
|
config := &loggingTestConfig{
|
||||||
|
LogsProvider: gclLogsProvider,
|
||||||
|
Pods: pods,
|
||||||
|
IngestionTimeout: ingestionTimeout,
|
||||||
|
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
|
||||||
|
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
|
||||||
|
}
|
||||||
|
err = waitForLogsIngestion(f, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
framework.Failf("Failed to ingest logs: %v", err)
|
framework.Failf("Failed to ingest logs: %v", err)
|
||||||
} else {
|
} else {
|
||||||
@@ -96,7 +104,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]",
|
|||||||
time.Sleep(jobDuration)
|
time.Sleep(jobDuration)
|
||||||
|
|
||||||
By("Waiting for all log lines to be ingested")
|
By("Waiting for all log lines to be ingested")
|
||||||
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
|
config := &loggingTestConfig{
|
||||||
|
LogsProvider: gclLogsProvider,
|
||||||
|
Pods: pods,
|
||||||
|
IngestionTimeout: ingestionTimeout,
|
||||||
|
MaxAllowedLostFraction: loadTestMaxAllowedLostFraction,
|
||||||
|
MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts,
|
||||||
|
}
|
||||||
|
err = waitForLogsIngestion(f, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
framework.Failf("Failed to ingest logs: %v", err)
|
framework.Failf("Failed to ingest logs: %v", err)
|
||||||
} else {
|
} else {
|
||||||
|
@@ -64,6 +64,10 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
|
|||||||
return provider, nil
|
return provider, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logsProvider *gclLogsProvider) FluentdApplicationName() string {
|
||||||
|
return "fluentd-gcp"
|
||||||
|
}
|
||||||
|
|
||||||
// Since GCL API is not easily available from the outside of cluster
|
// Since GCL API is not easily available from the outside of cluster
|
||||||
// we use gcloud command to perform search with filter
|
// we use gcloud command to perform search with filter
|
||||||
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
|
||||||
|
@@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package e2e
|
package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -26,6 +25,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/client-go/util/integer"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
api_v1 "k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
@@ -63,10 +63,19 @@ type logEntry struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type logsProvider interface {
|
type logsProvider interface {
|
||||||
|
FluentdApplicationName() string
|
||||||
EnsureWorking() error
|
EnsureWorking() error
|
||||||
ReadEntries(*loggingPod) []*logEntry
|
ReadEntries(*loggingPod) []*logEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type loggingTestConfig struct {
|
||||||
|
LogsProvider logsProvider
|
||||||
|
Pods []*loggingPod
|
||||||
|
IngestionTimeout time.Duration
|
||||||
|
MaxAllowedLostFraction float64
|
||||||
|
MaxAllowedFluentdRestarts int
|
||||||
|
}
|
||||||
|
|
||||||
func (entry *logEntry) getLogEntryNumber() (int, bool) {
|
func (entry *logEntry) getLogEntryNumber() (int, bool) {
|
||||||
chunks := strings.Split(entry.Payload, " ")
|
chunks := strings.Split(entry.Payload, " ")
|
||||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0]))
|
lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0]))
|
||||||
@@ -123,27 +132,27 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount i
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error {
|
func waitForLogsIngestion(f *framework.Framework, config *loggingTestConfig) error {
|
||||||
expectedLinesNumber := 0
|
expectedLinesNumber := 0
|
||||||
for _, pod := range pods {
|
for _, pod := range config.Pods {
|
||||||
expectedLinesNumber += pod.ExpectedLinesNumber
|
expectedLinesNumber += pod.ExpectedLinesNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
totalMissing := expectedLinesNumber
|
totalMissing := expectedLinesNumber
|
||||||
|
|
||||||
missingByPod := make([]int, len(pods))
|
missingByPod := make([]int, len(config.Pods))
|
||||||
for podIdx, pod := range pods {
|
for podIdx, pod := range config.Pods {
|
||||||
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||||
missing := 0
|
missing := 0
|
||||||
for podIdx, pod := range pods {
|
for podIdx, pod := range config.Pods {
|
||||||
if missingByPod[podIdx] == 0 {
|
if missingByPod[podIdx] == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod)
|
missingByPod[podIdx] = pullMissingLogsCount(config.LogsProvider, pod)
|
||||||
missing += missingByPod[podIdx]
|
missing += missingByPod[podIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,13 +165,32 @@ func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingesti
|
|||||||
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
|
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
|
||||||
|
|
||||||
if totalMissing > 0 {
|
if totalMissing > 0 {
|
||||||
framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines",
|
framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines",
|
||||||
ingestionTimeout, totalMissing, lostFraction*100)
|
config.IngestionTimeout, totalMissing, lostFraction*100)
|
||||||
}
|
}
|
||||||
|
|
||||||
if lostFraction > maxAllowedLostFraction {
|
if lostFraction > config.MaxAllowedLostFraction {
|
||||||
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
|
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
|
||||||
lostFraction*100, maxAllowedLostFraction*100)
|
lostFraction*100, config.MaxAllowedLostFraction*100)
|
||||||
|
}
|
||||||
|
|
||||||
|
fluentdPods, err := getFluentdPods(f, config.LogsProvider.FluentdApplicationName())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get fluentd pods due to %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
maxRestartCount := 0
|
||||||
|
for _, fluentdPod := range fluentdPods.Items {
|
||||||
|
restartCount := int(fluentdPod.Status.ContainerStatuses[0].RestartCount)
|
||||||
|
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
|
||||||
|
|
||||||
|
framework.Logf("Fluentd pod %s on node %s was restarted %d times",
|
||||||
|
fluentdPod.Name, fluentdPod.Spec.NodeName, restartCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxRestartCount > config.MaxAllowedFluentdRestarts {
|
||||||
|
return fmt.Errorf("max fluentd pod restarts was %d, which is more than allowed %d",
|
||||||
|
maxRestartCount, config.MaxAllowedFluentdRestarts)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -211,32 +239,8 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro
|
|||||||
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error {
|
func getFluentdPods(f *framework.Framework, fluentdApplicationName string) (*api_v1.PodList, error) {
|
||||||
synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{})
|
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": fluentdApplicationName}))
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get synth logger pod due to %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
synthLoggerNodeName := synthLoggerPod.Spec.NodeName
|
|
||||||
if synthLoggerNodeName == "" {
|
|
||||||
return errors.New("Synthlogger pod is not assigned to the node")
|
|
||||||
}
|
|
||||||
|
|
||||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"}))
|
|
||||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
||||||
fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||||
|
|
||||||
for _, fluentdPod := range fluentdPods.Items {
|
|
||||||
if fluentdPod.Spec.NodeName == synthLoggerNodeName {
|
|
||||||
containerName := fluentdPod.Spec.Containers[0].Name
|
|
||||||
logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
|
|
||||||
}
|
|
||||||
framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName)
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user