Refactor logging e2e tests, add new checks
This commit is contained in:
parent
ae1ff1a2d4
commit
86a2ac9433
@ -21,8 +21,7 @@ go_test(
|
||||
"//test/e2e/apimachinery:go_default_library",
|
||||
"//test/e2e/autoscaling:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/instrumentation/logging:go_default_library",
|
||||
"//test/e2e/instrumentation/monitoring:go_default_library",
|
||||
"//test/e2e/instrumentation:go_default_library",
|
||||
"//test/e2e/kubectl:go_default_library",
|
||||
"//test/e2e/lifecycle:go_default_library",
|
||||
"//test/e2e/lifecycle/bootstrap:go_default_library",
|
||||
|
@ -22,8 +22,7 @@ import (
|
||||
_ "k8s.io/kubernetes/test/e2e/apimachinery"
|
||||
_ "k8s.io/kubernetes/test/e2e/autoscaling"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/logging"
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/monitoring"
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
_ "k8s.io/kubernetes/test/e2e/kubectl"
|
||||
_ "k8s.io/kubernetes/test/e2e/lifecycle"
|
||||
_ "k8s.io/kubernetes/test/e2e/lifecycle/bootstrap"
|
||||
|
@ -9,9 +9,12 @@ load(
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["framework.go"],
|
||||
srcs = ["imports.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/onsi/ginkgo:go_default_library"],
|
||||
deps = [
|
||||
"//test/e2e/instrumentation/logging:go_default_library",
|
||||
"//test/e2e/instrumentation/monitoring:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
@ -25,6 +28,7 @@ filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//test/e2e/instrumentation/common:all-srcs",
|
||||
"//test/e2e/instrumentation/logging:all-srcs",
|
||||
"//test/e2e/instrumentation/monitoring:all-srcs",
|
||||
],
|
||||
|
28
test/e2e/instrumentation/common/BUILD
Normal file
28
test/e2e/instrumentation/common/BUILD
Normal file
@ -0,0 +1,28 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["framework.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/github.com/onsi/ginkgo:go_default_library"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package instrumentation
|
||||
package common
|
||||
|
||||
import "github.com/onsi/ginkgo"
|
||||
|
||||
// SIGDescribe annotates the test with the SIG label.
|
||||
func SIGDescribe(text string, body func()) bool {
|
||||
return ginkgo.Describe("[sig-instrumentation] "+text, body)
|
||||
}
|
22
test/e2e/instrumentation/imports.go
Normal file
22
test/e2e/instrumentation/imports.go
Normal file
@ -0,0 +1,22 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package instrumentation
|
||||
|
||||
import (
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/logging"
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/monitoring"
|
||||
)
|
@ -10,34 +10,18 @@ load(
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"es.go",
|
||||
"es_kibana.go",
|
||||
"es_utils.go",
|
||||
"generic_soak.go",
|
||||
"sd.go",
|
||||
"sd_events.go",
|
||||
"sd_soak.go",
|
||||
"sd_utils.go",
|
||||
"utils.go",
|
||||
"imports.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/instrumentation:go_default_library",
|
||||
"//test/e2e/instrumentation/common:go_default_library",
|
||||
"//test/e2e/instrumentation/logging/elasticsearch:go_default_library",
|
||||
"//test/e2e/instrumentation/logging/stackdrvier:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
|
||||
"//vendor/google.golang.org/api/pubsub/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/integer:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@ -50,6 +34,11 @@ filegroup(
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//test/e2e/instrumentation/logging/elasticsearch:all-srcs",
|
||||
"//test/e2e/instrumentation/logging/stackdrvier:all-srcs",
|
||||
"//test/e2e/instrumentation/logging/utils:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
43
test/e2e/instrumentation/logging/elasticsearch/BUILD
Normal file
43
test/e2e/instrumentation/logging/elasticsearch/BUILD
Normal file
@ -0,0 +1,43 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"basic.go",
|
||||
"kibana.go",
|
||||
"utils.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/instrumentation/common:go_default_library",
|
||||
"//test/e2e/instrumentation/logging/utils:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
@ -14,55 +14,48 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
"github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() {
|
||||
f := framework.NewDefaultFramework("es-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
// TODO: For now assume we are only testing cluster logging with Elasticsearch
|
||||
// on GCE. Once we are sure that Elasticsearch cluster level logging
|
||||
// works for other providers we should widen this scope of this test.
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
})
|
||||
|
||||
It("should check that logs from containers are ingested into Elasticsearch", func() {
|
||||
podName := "synthlogger"
|
||||
esLogsProvider, err := newEsLogsProvider(f)
|
||||
ginkgo.It("should check that logs from containers are ingested into Elasticsearch", func() {
|
||||
ingestionInterval := 10 * time.Second
|
||||
ingestionTimeout := 10 * time.Minute
|
||||
|
||||
p, err := newEsLogProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider")
|
||||
|
||||
err = esLogsProvider.Init()
|
||||
defer esLogsProvider.Cleanup()
|
||||
err = p.Init()
|
||||
defer p.Cleanup()
|
||||
framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider")
|
||||
|
||||
err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName())
|
||||
err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName())
|
||||
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
||||
|
||||
By("Running synthetic logger")
|
||||
pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))
|
||||
pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger", "test"), f)
|
||||
framework.ExpectNoError(err, "Failed to start a pod")
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
config := &loggingTestConfig{
|
||||
LogsProvider: esLogsProvider,
|
||||
Pods: []*loggingPod{pod},
|
||||
IngestionTimeout: 10 * time.Minute,
|
||||
MaxAllowedLostFraction: 0,
|
||||
MaxAllowedFluentdRestarts: 0,
|
||||
}
|
||||
framework.ExpectNoError(waitForSomeLogs(f, config), "Failed to ingest logs")
|
||||
ginkgo.By("Waiting for logs to ingest")
|
||||
c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, pod.Name())
|
||||
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
})
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -23,23 +23,24 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"github.com/onsi/ginkgo"
|
||||
"github.com/onsi/gomega"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Kibana Logging Instances Is Alive [Feature:Elasticsearch]", func() {
|
||||
f := framework.NewDefaultFramework("kibana-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
ginkgo.BeforeEach(func() {
|
||||
// TODO: For now assume we are only testing cluster logging with Elasticsearch
|
||||
// and Kibana on GCE. Once we are sure that Elasticsearch and Kibana cluster level logging
|
||||
// works for other providers we should widen this scope of this test.
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
})
|
||||
|
||||
It("should check that the Kibana logging instance is alive", func() {
|
||||
ginkgo.It("should check that the Kibana logging instance is alive", func() {
|
||||
ClusterLevelLoggingWithKibana(f)
|
||||
})
|
||||
})
|
||||
@ -51,61 +52,54 @@ const (
|
||||
|
||||
// ClusterLevelLoggingWithKibana is an end to end test that checks to see if Kibana is alive.
|
||||
func ClusterLevelLoggingWithKibana(f *framework.Framework) {
|
||||
// graceTime is how long to keep retrying requests for status information.
|
||||
const graceTime = 20 * time.Minute
|
||||
const pollingInterval = 10 * time.Second
|
||||
const pollingTimeout = 20 * time.Minute
|
||||
|
||||
// Check for the existence of the Kibana service.
|
||||
By("Checking the Kibana service exists.")
|
||||
ginkgo.By("Checking the Kibana service exists.")
|
||||
s := f.ClientSet.Core().Services(metav1.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
var err error
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
if _, err = s.Get("kibana-logging", metav1.GetOptions{}); err == nil {
|
||||
break
|
||||
err := wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) {
|
||||
if _, err := s.Get("kibana-logging", metav1.GetOptions{}); err != nil {
|
||||
framework.Logf("Kibana is unreachable: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Kibana service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
return true, nil
|
||||
})
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
|
||||
// Wait for the Kibana pod(s) to enter the running state.
|
||||
By("Checking to make sure the Kibana pods are running")
|
||||
ginkgo.By("Checking to make sure the Kibana pods are running")
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{kibanaKey: kibanaValue}))
|
||||
options := metav1.ListOptions{LabelSelector: label.String()}
|
||||
pods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking to make sure we get a response from the Kibana UI.")
|
||||
err = nil
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
err = errProxy
|
||||
continue
|
||||
ginkgo.By("Checking to make sure we get a response from the Kibana UI.")
|
||||
err = wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) {
|
||||
req, err := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get services proxy request: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Query against the root URL for Kibana.
|
||||
_, err = proxyRequest.Namespace(metav1.NamespaceSystem).
|
||||
_, err = req.Namespace(metav1.NamespaceSystem).
|
||||
Context(ctx).
|
||||
Name("kibana-logging").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
framework.Failf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
|
||||
break
|
||||
}
|
||||
framework.Logf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
framework.Logf("Proxy call to kibana-logging failed: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
return true, nil
|
||||
})
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -26,31 +26,35 @@ import (
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
// esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
|
||||
esRetryTimeout = 5 * time.Minute
|
||||
|
||||
// esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
|
||||
esRetryDelay = 5 * time.Second
|
||||
|
||||
// searchPageSize is how many entries to search for in Elasticsearch.
|
||||
searchPageSize = 1000
|
||||
)
|
||||
|
||||
type esLogsProvider struct {
|
||||
var _ utils.LogProvider = &esLogProvider{}
|
||||
|
||||
type esLogProvider struct {
|
||||
Framework *framework.Framework
|
||||
}
|
||||
|
||||
func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
|
||||
return &esLogsProvider{Framework: f}, nil
|
||||
func newEsLogProvider(f *framework.Framework) (*esLogProvider, error) {
|
||||
return &esLogProvider{Framework: f}, nil
|
||||
}
|
||||
|
||||
// Ensures that elasticsearch is running and ready to serve requests
|
||||
func (logsProvider *esLogsProvider) Init() error {
|
||||
f := logsProvider.Framework
|
||||
func (p *esLogProvider) Init() error {
|
||||
f := p.Framework
|
||||
// Check for the existence of the Elasticsearch service.
|
||||
By("Checking the Elasticsearch service exists.")
|
||||
framework.Logf("Checking the Elasticsearch service exists.")
|
||||
s := f.ClientSet.Core().Services(api.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
@ -61,20 +65,26 @@ func (logsProvider *esLogsProvider) Init() error {
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for the Elasticsearch pods to enter the running state.
|
||||
By("Checking to make sure the Elasticsearch pods are running")
|
||||
framework.Logf("Checking to make sure the Elasticsearch pods are running")
|
||||
labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
|
||||
options := meta_v1.ListOptions{LabelSelector: labelSelector}
|
||||
pods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
By("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
framework.Logf("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
|
||||
var statusCode int
|
||||
err = nil
|
||||
@ -102,14 +112,16 @@ func (logsProvider *esLogsProvider) Init() error {
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if int(statusCode) != 200 {
|
||||
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
|
||||
}
|
||||
|
||||
// Now assume we really are talking to an Elasticsearch instance.
|
||||
// Check the cluster health.
|
||||
By("Checking health of Elasticsearch service.")
|
||||
framework.Logf("Checking health of Elasticsearch service.")
|
||||
healthy := false
|
||||
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
@ -153,12 +165,12 @@ func (logsProvider *esLogsProvider) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (logsProvider *esLogsProvider) Cleanup() {
|
||||
func (p *esLogProvider) Cleanup() {
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
f := logsProvider.Framework
|
||||
func (p *esLogProvider) ReadEntries(name string) []utils.LogEntry {
|
||||
f := p.Framework
|
||||
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
|
||||
if errProxy != nil {
|
||||
@ -166,7 +178,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
return nil
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", pod.Name, f.Namespace.Name)
|
||||
query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", name, f.Namespace.Name)
|
||||
framework.Logf("Sending a search request to Elasticsearch with the following query: %s", query)
|
||||
|
||||
// Ask Elasticsearch to return all the log lines that were tagged with the
|
||||
@ -176,7 +188,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
Suffix("_search").
|
||||
Param("q", query).
|
||||
// Ask for more in case we included some unrelated records in our query
|
||||
Param("size", strconv.Itoa(pod.ExpectedLinesNumber*10)).
|
||||
Param("size", strconv.Itoa(searchPageSize)).
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
|
||||
@ -202,7 +214,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries := []logEntry{}
|
||||
entries := []utils.LogEntry{}
|
||||
// Iterate over the hits and populate the observed array.
|
||||
for _, e := range h {
|
||||
l, ok := e.(map[string]interface{})
|
||||
@ -218,17 +230,23 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
}
|
||||
|
||||
msg, ok := source["log"].(string)
|
||||
if !ok {
|
||||
framework.Logf("Log not of the expected type: %T", source["log"])
|
||||
if ok {
|
||||
entries = append(entries, utils.LogEntry{TextPayload: msg})
|
||||
continue
|
||||
}
|
||||
|
||||
entries = append(entries, logEntry{Payload: msg})
|
||||
obj, ok := source["log"].(map[string]interface{})
|
||||
if ok {
|
||||
entries = append(entries, utils.LogEntry{JSONPayload: obj})
|
||||
continue
|
||||
}
|
||||
|
||||
framework.Logf("Log is of unknown type, got %v, want string or object in field 'log'", source)
|
||||
}
|
||||
|
||||
return entries
|
||||
}
|
||||
|
||||
func (logsProvider *esLogsProvider) FluentdApplicationName() string {
|
||||
func (p *esLogProvider) LoggingAgentName() string {
|
||||
return "fluentd-es"
|
||||
}
|
@ -27,7 +27,7 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Logging soak [Performance] [Slow] [Disruptive]", func() {
|
||||
|
22
test/e2e/instrumentation/logging/imports.go
Normal file
22
test/e2e/instrumentation/logging/imports.go
Normal file
@ -0,0 +1,22 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/logging/elasticsearch"
|
||||
_ "k8s.io/kubernetes/test/e2e/instrumentation/logging/stackdrvier"
|
||||
)
|
@ -1,66 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() {
|
||||
f := framework.NewDefaultFramework("sd-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessProviderIs("gce", "gke")
|
||||
})
|
||||
|
||||
It("should ingest logs from applications", func() {
|
||||
podName := "synthlogger"
|
||||
|
||||
sdLogsProvider, err := newSdLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
|
||||
|
||||
err = sdLogsProvider.Init()
|
||||
defer sdLogsProvider.Cleanup()
|
||||
framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
|
||||
|
||||
err = ensureSingleFluentdOnEachNode(f, sdLogsProvider.FluentdApplicationName())
|
||||
framework.ExpectNoError(err, "Fluentd deployed incorrectly")
|
||||
|
||||
By("Running synthetic logger")
|
||||
pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute)
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName))
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
config := &loggingTestConfig{
|
||||
LogsProvider: sdLogsProvider,
|
||||
Pods: []*loggingPod{pod},
|
||||
IngestionTimeout: 10 * time.Minute,
|
||||
MaxAllowedLostFraction: 0,
|
||||
MaxAllowedFluentdRestarts: 0,
|
||||
}
|
||||
framework.ExpectNoError(waitForSomeLogs(f, config), "Failed to ingest logs")
|
||||
})
|
||||
})
|
@ -1,94 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
const (
|
||||
// eventsIngestionTimeout is the amount of time to wait until some
|
||||
// events are ingested.
|
||||
eventsIngestionTimeout = 10 * time.Minute
|
||||
|
||||
// eventPollingInterval is the delay between attempts to read events
|
||||
// from the logs provider.
|
||||
eventPollingInterval = 1 * time.Second
|
||||
|
||||
// eventCreationInterval is the minimal delay between two events
|
||||
// created for testing purposes.
|
||||
eventCreationInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() {
|
||||
f := framework.NewDefaultFramework("sd-logging-events")
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessProviderIs("gce", "gke")
|
||||
})
|
||||
|
||||
It("should ingest events", func() {
|
||||
sdLogsProvider, err := newSdLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
|
||||
|
||||
err = sdLogsProvider.Init()
|
||||
defer sdLogsProvider.Cleanup()
|
||||
framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
successCh := make(chan struct{})
|
||||
go func() {
|
||||
wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) {
|
||||
events := sdLogsProvider.ReadEvents()
|
||||
if len(events) > 0 {
|
||||
framework.Logf("Some events are ingested, sample event: %v", events[0])
|
||||
close(successCh)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
close(stopCh)
|
||||
}()
|
||||
|
||||
By("Running pods to generate events while waiting for some of them to be ingested")
|
||||
wait.PollUntil(eventCreationInterval, func() (bool, error) {
|
||||
podName := "synthlogger"
|
||||
startNewLoggingPod(f, podName, "", 1, 1*time.Second)
|
||||
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
|
||||
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to wait pod %s to successfully complete due to %v", podName, err)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
|
||||
select {
|
||||
case <-successCh:
|
||||
break
|
||||
default:
|
||||
framework.Failf("No events are present in Stackdriver after %v", eventsIngestionTimeout)
|
||||
}
|
||||
})
|
||||
})
|
@ -1,112 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxAllowedLostFraction is the fraction of lost logs considered acceptable.
|
||||
maxAllowedLostFraction = 0.01
|
||||
// maxAllowedRestartsPerHour is the number of fluentd container restarts
|
||||
// considered acceptable. Once per hour is fine for now, as long as it
|
||||
// doesn't loose too much logs.
|
||||
maxAllowedRestartsPerHour = 1.0
|
||||
// lastPodIngestionSlack is the amount of time to wait for the last pod's
|
||||
// logs to be ingested by the logging agent.
|
||||
lastPodIngestionSlack = 5 * time.Minute
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() {
|
||||
f := framework.NewDefaultFramework("sd-logging-load")
|
||||
|
||||
It("should ingest logs from applications running for a prolonged amount of time", func() {
|
||||
sdLogsProvider, err := newSdLogsProvider(f)
|
||||
framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
|
||||
|
||||
err = sdLogsProvider.Init()
|
||||
defer sdLogsProvider.Cleanup()
|
||||
framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
|
||||
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
||||
maxPodCount := 10
|
||||
jobDuration := 30 * time.Minute
|
||||
linesPerPodPerSecond := 100
|
||||
// TODO(crassirostris): Increase to 21 hrs
|
||||
testDuration := 3 * time.Hour
|
||||
ingestionTimeout := testDuration + 30*time.Minute
|
||||
allowedRestarts := int(math.Ceil(float64(testDuration) /
|
||||
float64(time.Hour) * maxAllowedRestartsPerHour))
|
||||
|
||||
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
|
||||
podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1
|
||||
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
|
||||
|
||||
// pods is a flat array of all pods to be run and to expect in Stackdriver.
|
||||
pods := []*loggingPod{}
|
||||
// podsByRun is a two-dimensional array of pods, first dimension is the run
|
||||
// index, the second dimension is the node index. Since we want to create
|
||||
// an equal load on all nodes, for the same run we have one pod per node.
|
||||
podsByRun := [][]*loggingPod{}
|
||||
for runIdx := 0; runIdx < podRunCount; runIdx++ {
|
||||
podsInRun := []*loggingPod{}
|
||||
for nodeIdx, node := range nodes {
|
||||
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
|
||||
pod := newLoggingPod(podName, node.Name, linesPerPod, jobDuration)
|
||||
pods = append(pods, pod)
|
||||
podsInRun = append(podsInRun, pod)
|
||||
}
|
||||
podsByRun = append(podsByRun, podsInRun)
|
||||
}
|
||||
|
||||
By("Running short-living pods")
|
||||
go func() {
|
||||
for runIdx := 0; runIdx < podRunCount; runIdx++ {
|
||||
// Starting one pod on each node.
|
||||
for _, pod := range podsByRun[runIdx] {
|
||||
pod.Start(f)
|
||||
}
|
||||
time.Sleep(podRunDelay)
|
||||
}
|
||||
// Waiting until the last pod has completed
|
||||
time.Sleep(jobDuration - podRunDelay + lastPodIngestionSlack)
|
||||
}()
|
||||
|
||||
By("Waiting for all log lines to be ingested")
|
||||
config := &loggingTestConfig{
|
||||
LogsProvider: sdLogsProvider,
|
||||
Pods: pods,
|
||||
IngestionTimeout: ingestionTimeout,
|
||||
MaxAllowedLostFraction: maxAllowedLostFraction,
|
||||
MaxAllowedFluentdRestarts: allowedRestarts,
|
||||
}
|
||||
err = waitForFullLogsIngestion(f, config)
|
||||
if err != nil {
|
||||
framework.Failf("Failed to ingest logs: %v", err)
|
||||
} else {
|
||||
framework.Logf("Successfully ingested all logs")
|
||||
}
|
||||
})
|
||||
})
|
@ -1,356 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2/google"
|
||||
sd "google.golang.org/api/logging/v2beta1"
|
||||
pubsub "google.golang.org/api/pubsub/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
// The amount of time to wait for Stackdriver Logging
|
||||
// sink to become operational
|
||||
sinkStartupTimeout = 10 * time.Minute
|
||||
|
||||
// The limit on the number of messages to pull from PubSub
|
||||
maxPullLogMessages = 100 * 1000
|
||||
|
||||
// The limit on the number of messages in the single cache
|
||||
maxCacheSize = 10 * 1000
|
||||
|
||||
// PubSub topic with log entries polling interval
|
||||
sdLoggingPollInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type sdLogsProvider struct {
|
||||
SdService *sd.Service
|
||||
PubsubService *pubsub.Service
|
||||
Framework *framework.Framework
|
||||
Topic *pubsub.Topic
|
||||
Subscription *pubsub.Subscription
|
||||
LogSink *sd.LogSink
|
||||
LogEntryCache map[string]chan logEntry
|
||||
EventCache chan map[string]interface{}
|
||||
CacheMutex *sync.Mutex
|
||||
PollingStopChannel chan struct{}
|
||||
}
|
||||
|
||||
func newSdLogsProvider(f *framework.Framework) (*sdLogsProvider, error) {
|
||||
ctx := context.Background()
|
||||
hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
|
||||
sdService, err := sd.New(hc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubsubService, err := pubsub.New(hc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
provider := &sdLogsProvider{
|
||||
SdService: sdService,
|
||||
PubsubService: pubsubService,
|
||||
Framework: f,
|
||||
LogEntryCache: map[string]chan logEntry{},
|
||||
EventCache: make(chan map[string]interface{}, maxCacheSize),
|
||||
CacheMutex: &sync.Mutex{},
|
||||
PollingStopChannel: make(chan struct{}, 1),
|
||||
}
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) Init() error {
|
||||
projectId := framework.TestContext.CloudConfig.ProjectID
|
||||
nsName := sdLogsProvider.Framework.Namespace.Name
|
||||
|
||||
topic, err := sdLogsProvider.createPubSubTopic(projectId, nsName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create PubSub topic: %v", err)
|
||||
}
|
||||
sdLogsProvider.Topic = topic
|
||||
|
||||
subs, err := sdLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create PubSub subscription: %v", err)
|
||||
}
|
||||
sdLogsProvider.Subscription = subs
|
||||
|
||||
logSink, err := sdLogsProvider.createSink(projectId, nsName, nsName, topic.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
|
||||
}
|
||||
sdLogsProvider.LogSink = logSink
|
||||
|
||||
if err = sdLogsProvider.authorizeSink(); err != nil {
|
||||
return fmt.Errorf("failed to authorize log sink: %v", err)
|
||||
}
|
||||
|
||||
if err = sdLogsProvider.waitSinkInit(); err != nil {
|
||||
return fmt.Errorf("failed to wait for sink to become operational: %v", err)
|
||||
}
|
||||
|
||||
go sdLogsProvider.pollLogs()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) {
|
||||
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName)
|
||||
topic := &pubsub.Topic{
|
||||
Name: topicFullName,
|
||||
}
|
||||
return sdLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do()
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) {
|
||||
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName)
|
||||
subs := &pubsub.Subscription{
|
||||
Name: subsFullName,
|
||||
Topic: topicName,
|
||||
}
|
||||
return sdLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) createSink(projectId, nsName, sinkName, topicName string) (*sd.LogSink, error) {
|
||||
projectDst := fmt.Sprintf("projects/%s", projectId)
|
||||
filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+
|
||||
"(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName)
|
||||
framework.Logf("Using the following filter for entries: %s", filter)
|
||||
sink := &sd.LogSink{
|
||||
Name: sinkName,
|
||||
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
|
||||
Filter: filter,
|
||||
}
|
||||
return sdLogsProvider.SdService.Projects.Sinks.Create(projectDst, sink).Do()
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) authorizeSink() error {
|
||||
topicsService := sdLogsProvider.PubsubService.Projects.Topics
|
||||
policy, err := topicsService.GetIamPolicy(sdLogsProvider.Topic.Name).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binding := &pubsub.Binding{
|
||||
Role: "roles/pubsub.publisher",
|
||||
Members: []string{sdLogsProvider.LogSink.WriterIdentity},
|
||||
}
|
||||
policy.Bindings = append(policy.Bindings, binding)
|
||||
req := &pubsub.SetIamPolicyRequest{Policy: policy}
|
||||
if _, err = topicsService.SetIamPolicy(sdLogsProvider.Topic.Name, req).Do(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) waitSinkInit() error {
|
||||
framework.Logf("Waiting for log sink to become operational")
|
||||
return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
|
||||
err := publish(sdLogsProvider.PubsubService, sdLogsProvider.Topic, "embrace eternity")
|
||||
if err != nil {
|
||||
framework.Logf("Failed to push message to PubSub due to %v", err)
|
||||
}
|
||||
|
||||
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to pull messages from PubSub due to %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if len(messages) > 0 {
|
||||
framework.Logf("Sink %s is operational", sdLogsProvider.LogSink.Name)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) pollLogs() {
|
||||
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
|
||||
messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to pull messages from PubSub due to %v", err)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
|
||||
if err != nil {
|
||||
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
|
||||
continue
|
||||
}
|
||||
|
||||
var sdLogEntry sd.LogEntry
|
||||
if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
|
||||
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
|
||||
continue
|
||||
}
|
||||
|
||||
switch sdLogEntry.Resource.Type {
|
||||
case "container":
|
||||
podName := sdLogEntry.Resource.Labels["pod_id"]
|
||||
ch := sdLogsProvider.getCacheChannel(podName)
|
||||
ch <- logEntry{Payload: sdLogEntry.TextPayload}
|
||||
break
|
||||
case "gke_cluster":
|
||||
jsonPayloadRaw, err := sdLogEntry.JsonPayload.MarshalJSON()
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
|
||||
break
|
||||
}
|
||||
var eventObject map[string]interface{}
|
||||
err = json.Unmarshal(jsonPayloadRaw, &eventObject)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:]))
|
||||
break
|
||||
}
|
||||
sdLogsProvider.EventCache <- eventObject
|
||||
break
|
||||
default:
|
||||
framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}, sdLogsProvider.PollingStopChannel)
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) Cleanup() {
|
||||
sdLogsProvider.PollingStopChannel <- struct{}{}
|
||||
|
||||
if sdLogsProvider.LogSink != nil {
|
||||
projectId := framework.TestContext.CloudConfig.ProjectID
|
||||
sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, sdLogsProvider.LogSink.Name)
|
||||
sinksService := sdLogsProvider.SdService.Projects.Sinks
|
||||
if _, err := sinksService.Delete(sinkNameId).Do(); err != nil {
|
||||
framework.Logf("Failed to delete LogSink: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sdLogsProvider.Subscription != nil {
|
||||
subsService := sdLogsProvider.PubsubService.Projects.Subscriptions
|
||||
if _, err := subsService.Delete(sdLogsProvider.Subscription.Name).Do(); err != nil {
|
||||
framework.Logf("Failed to delete PubSub subscription: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sdLogsProvider.Topic != nil {
|
||||
topicsService := sdLogsProvider.PubsubService.Projects.Topics
|
||||
if _, err := topicsService.Delete(sdLogsProvider.Topic.Name).Do(); err != nil {
|
||||
framework.Logf("Failed to delete PubSub topic: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) ReadEntries(pod *loggingPod) []logEntry {
|
||||
var entries []logEntry
|
||||
ch := sdLogsProvider.getCacheChannel(pod.Name)
|
||||
polling_loop:
|
||||
for {
|
||||
select {
|
||||
case entry := <-ch:
|
||||
entries = append(entries, entry)
|
||||
default:
|
||||
break polling_loop
|
||||
}
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func (logsProvider *sdLogsProvider) FluentdApplicationName() string {
|
||||
return "fluentd-gcp"
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) ReadEvents() []map[string]interface{} {
|
||||
var events []map[string]interface{}
|
||||
polling_loop:
|
||||
for {
|
||||
select {
|
||||
case event := <-sdLogsProvider.EventCache:
|
||||
events = append(events, event)
|
||||
default:
|
||||
break polling_loop
|
||||
}
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
func (sdLogsProvider *sdLogsProvider) getCacheChannel(podName string) chan logEntry {
|
||||
sdLogsProvider.CacheMutex.Lock()
|
||||
defer sdLogsProvider.CacheMutex.Unlock()
|
||||
|
||||
if ch, ok := sdLogsProvider.LogEntryCache[podName]; ok {
|
||||
return ch
|
||||
}
|
||||
|
||||
newCh := make(chan logEntry, maxCacheSize)
|
||||
sdLogsProvider.LogEntryCache[podName] = newCh
|
||||
return newCh
|
||||
}
|
||||
|
||||
func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
|
||||
subsService := service.Projects.Subscriptions
|
||||
req := &pubsub.PullRequest{
|
||||
ReturnImmediately: true,
|
||||
MaxMessages: maxPullLogMessages,
|
||||
}
|
||||
|
||||
resp, err := subsService.Pull(subs.Name, req).Do()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ids []string
|
||||
for _, msg := range resp.ReceivedMessages {
|
||||
ids = append(ids, msg.AckId)
|
||||
}
|
||||
if len(ids) > 0 {
|
||||
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
|
||||
if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
|
||||
framework.Logf("Failed to ack poll: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return resp.ReceivedMessages, nil
|
||||
}
|
||||
|
||||
func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
|
||||
topicsService := service.Projects.Topics
|
||||
req := &pubsub.PublishRequest{
|
||||
Messages: []*pubsub.PubsubMessage{
|
||||
{
|
||||
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err := topicsService.Publish(topic.Name, req).Do()
|
||||
return err
|
||||
}
|
44
test/e2e/instrumentation/logging/stackdrvier/BUILD
Normal file
44
test/e2e/instrumentation/logging/stackdrvier/BUILD
Normal file
@ -0,0 +1,44 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"basic.go",
|
||||
"soak.go",
|
||||
"utils.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/instrumentation/common:go_default_library",
|
||||
"//test/e2e/instrumentation/logging/utils:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||
"//vendor/google.golang.org/api/logging/v2beta1:go_default_library",
|
||||
"//vendor/google.golang.org/api/pubsub/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
152
test/e2e/instrumentation/logging/stackdrvier/basic.go
Normal file
152
test/e2e/instrumentation/logging/stackdrvier/basic.go
Normal file
@ -0,0 +1,152 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package stackdriver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
|
||||
|
||||
"github.com/onsi/ginkgo"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
ingestionInterval = 10 * time.Second
|
||||
ingestionTimeout = 10 * time.Minute
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() {
|
||||
f := framework.NewDefaultFramework("sd-logging")
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
framework.SkipUnlessProviderIs("gce", "gke")
|
||||
})
|
||||
|
||||
ginkgo.It("should ingest logs", func() {
|
||||
|
||||
withLogProviderForScope(f, podsScope, func(p *sdLogProvider) {
|
||||
ginkgo.By("Checking ingesting text logs", func() {
|
||||
pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-1", "hey"), f)
|
||||
framework.ExpectNoError(err, "Failed to start a pod")
|
||||
|
||||
ginkgo.By("Waiting for logs to ingest")
|
||||
c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, pod.Name())
|
||||
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
|
||||
ginkgo.By("Checking ingesting json logs", func() {
|
||||
logRaw := "{\"a\":\"b\"}"
|
||||
pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-2", logRaw), f)
|
||||
framework.ExpectNoError(err, "Failed to start a pod")
|
||||
|
||||
ginkgo.By("Waiting for logs to ingest")
|
||||
c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) {
|
||||
if len(logEntries) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
log := logEntries[0]
|
||||
if log.JSONPayload == nil {
|
||||
return false, fmt.Errorf("log entry unexpectedly is not json: %s", log.TextPayload)
|
||||
}
|
||||
if log.JSONPayload["a"] != "b" {
|
||||
bytes, err := json.Marshal(log.JSONPayload)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("log entry ingested incorrectly, failed to marshal: %v", err)
|
||||
}
|
||||
return false, fmt.Errorf("log entry ingested incorrectly, got %v, want %s",
|
||||
string(bytes), logRaw)
|
||||
}
|
||||
return true, nil
|
||||
}, utils.JustTimeout, pod.Name())
|
||||
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
|
||||
ginkgo.By("Checking ingesting logs in glog format", func() {
|
||||
logUnformatted := "Text"
|
||||
logRaw := fmt.Sprintf("I0101 00:00:00.000000 1 main.go:1] %s", logUnformatted)
|
||||
pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-3", logRaw), f)
|
||||
framework.ExpectNoError(err, "Failed to start a pod")
|
||||
|
||||
ginkgo.By("Waiting for logs to ingest")
|
||||
c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) {
|
||||
if len(logEntries) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
log := logEntries[0]
|
||||
if log.TextPayload == "" {
|
||||
return false, fmt.Errorf("log entry is unexpectedly json: %v", log.JSONPayload)
|
||||
}
|
||||
if log.TextPayload != logUnformatted {
|
||||
return false, fmt.Errorf("log entry ingested incorrectly, got %s, want %s",
|
||||
log.TextPayload, logUnformatted)
|
||||
}
|
||||
return true, nil
|
||||
}, utils.JustTimeout, pod.Name())
|
||||
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.It("should ingest events", func() {
|
||||
eventCreationInterval := 10 * time.Second
|
||||
|
||||
withLogProviderForScope(f, eventsScope, func(p *sdLogProvider) {
|
||||
ginkgo.By("Running pods to generate events while waiting for some of them to be ingested")
|
||||
stopCh := make(chan struct{})
|
||||
cleanupCh := make(chan struct{})
|
||||
defer func() { <-cleanupCh }()
|
||||
defer close(stopCh)
|
||||
go func() {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
defer close(cleanupCh)
|
||||
|
||||
wait.PollUntil(eventCreationInterval, func() (bool, error) {
|
||||
podName := fmt.Sprintf("synthlogger-%s", string(uuid.NewUUID()))
|
||||
err := utils.NewLoadLoggingPod(podName, "", 1, 1*time.Second).Start(f)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to create a logging pod: %v", err)
|
||||
}
|
||||
return false, nil
|
||||
}, stopCh)
|
||||
}()
|
||||
|
||||
ginkgo.By("Waiting for events to ingest")
|
||||
c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, "")
|
||||
err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.It("should ingest system logs from all nodes", func() {
|
||||
withLogProviderForScope(f, systemScope, func(p *sdLogProvider) {
|
||||
ginkgo.By("Waiting for some system logs to ingest")
|
||||
nodeIds := utils.GetNodeIds(f.ClientSet)
|
||||
c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, nodeIds...)
|
||||
err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
})
|
||||
})
|
||||
})
|
101
test/e2e/instrumentation/logging/stackdrvier/soak.go
Normal file
101
test/e2e/instrumentation/logging/stackdrvier/soak.go
Normal file
@ -0,0 +1,101 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package stackdriver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
|
||||
|
||||
"github.com/onsi/ginkgo"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxAllowedLostFraction is the fraction of lost logs considered acceptable.
|
||||
maxAllowedLostFraction = 0.01
|
||||
// maxAllowedRestartsPerHour is the number of fluentd container restarts
|
||||
// considered acceptable. Once per hour is fine for now, as long as it
|
||||
// doesn't loose too much logs.
|
||||
maxAllowedRestartsPerHour = 1.0
|
||||
// lastPodIngestionSlack is the amount of time to wait for the last pod's
|
||||
// logs to be ingested by the logging agent.
|
||||
lastPodIngestionSlack = 5 * time.Minute
|
||||
)
|
||||
|
||||
var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() {
|
||||
f := framework.NewDefaultFramework("sd-logging-load")
|
||||
|
||||
ginkgo.It("should ingest logs from applications running for a prolonged amount of time", func() {
|
||||
withLogProviderForScope(f, podsScope, func(p *sdLogProvider) {
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items
|
||||
maxPodCount := 10
|
||||
jobDuration := 30 * time.Minute
|
||||
linesPerPodPerSecond := 100
|
||||
// TODO(crassirostris): Increase to 21 hrs
|
||||
testDuration := 3 * time.Hour
|
||||
ingestionInterval := 1 * time.Minute
|
||||
ingestionTimeout := testDuration + 30*time.Minute
|
||||
allowedRestarts := int(math.Ceil(float64(testDuration) /
|
||||
float64(time.Hour) * maxAllowedRestartsPerHour))
|
||||
|
||||
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
|
||||
podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1
|
||||
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
|
||||
|
||||
// pods is a flat array of all pods to be run and to expect in Stackdriver.
|
||||
pods := []utils.FiniteLoggingPod{}
|
||||
// podsByRun is a two-dimensional array of pods, first dimension is the run
|
||||
// index, the second dimension is the node index. Since we want to create
|
||||
// an equal load on all nodes, for the same run we have one pod per node.
|
||||
podsByRun := [][]utils.FiniteLoggingPod{}
|
||||
for runIdx := 0; runIdx < podRunCount; runIdx++ {
|
||||
podsInRun := []utils.FiniteLoggingPod{}
|
||||
for nodeIdx, node := range nodes {
|
||||
podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx)
|
||||
pod := utils.NewLoadLoggingPod(podName, node.Name, linesPerPod, jobDuration)
|
||||
pods = append(pods, pod)
|
||||
podsInRun = append(podsInRun, pod)
|
||||
}
|
||||
podsByRun = append(podsByRun, podsInRun)
|
||||
}
|
||||
|
||||
ginkgo.By("Running short-living pods")
|
||||
go func() {
|
||||
t := time.NewTicker(podRunDelay)
|
||||
defer t.Stop()
|
||||
for runIdx := 0; runIdx < podRunCount; runIdx++ {
|
||||
// Starting one pod on each node.
|
||||
for _, pod := range podsByRun[runIdx] {
|
||||
err := pod.Start(f)
|
||||
framework.Logf("Failed to start pod: %v", err)
|
||||
}
|
||||
<-t.C
|
||||
}
|
||||
}()
|
||||
|
||||
checker := utils.NewFullIngestionPodLogChecker(p, maxAllowedLostFraction, pods...)
|
||||
err := utils.WaitForLogs(checker, ingestionInterval, ingestionTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
utils.EnsureLoggingAgentRestartsCount(f, p.LoggingAgentName(), allowedRestarts)
|
||||
})
|
||||
})
|
||||
})
|
388
test/e2e/instrumentation/logging/stackdrvier/utils.go
Normal file
388
test/e2e/instrumentation/logging/stackdrvier/utils.go
Normal file
@ -0,0 +1,388 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package stackdriver
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/oauth2/google"
|
||||
sd "google.golang.org/api/logging/v2beta1"
|
||||
pubsub "google.golang.org/api/pubsub/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
// The amount of time to wait for Stackdriver Logging
|
||||
// sink to become operational
|
||||
sinkStartupTimeout = 10 * time.Minute
|
||||
|
||||
// The limit on the number of messages to pull from PubSub
|
||||
maxPullLogMessages = 100 * 1000
|
||||
|
||||
// maxQueueSize is the limit on the number of messages in the single queue.
|
||||
maxQueueSize = 10 * 1000
|
||||
|
||||
// PubSub topic with log entries polling interval
|
||||
sdLoggingPollInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type logProviderScope int
|
||||
|
||||
const (
|
||||
podsScope logProviderScope = iota
|
||||
eventsScope
|
||||
systemScope
|
||||
)
|
||||
|
||||
var _ utils.LogProvider = &sdLogProvider{}
|
||||
|
||||
type sdLogProvider struct {
|
||||
sdService *sd.Service
|
||||
pubsubService *pubsub.Service
|
||||
|
||||
framework *framework.Framework
|
||||
|
||||
topic *pubsub.Topic
|
||||
subscription *pubsub.Subscription
|
||||
logSink *sd.LogSink
|
||||
|
||||
pollingStopChannel chan struct{}
|
||||
|
||||
queueCollection utils.LogsQueueCollection
|
||||
|
||||
scope logProviderScope
|
||||
}
|
||||
|
||||
func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogProvider, error) {
|
||||
ctx := context.Background()
|
||||
hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
|
||||
sdService, err := sd.New(hc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubsubService, err := pubsub.New(hc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
provider := &sdLogProvider{
|
||||
scope: scope,
|
||||
sdService: sdService,
|
||||
pubsubService: pubsubService,
|
||||
framework: f,
|
||||
pollingStopChannel: make(chan struct{}, 1),
|
||||
queueCollection: utils.NewLogsQueueCollection(maxQueueSize),
|
||||
}
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) Init() error {
|
||||
projectID := framework.TestContext.CloudConfig.ProjectID
|
||||
nsName := p.framework.Namespace.Name
|
||||
|
||||
topic, err := p.createPubSubTopic(projectID, nsName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create PubSub topic: %v", err)
|
||||
}
|
||||
p.topic = topic
|
||||
|
||||
subs, err := p.createPubSubSubscription(projectID, nsName, topic.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create PubSub subscription: %v", err)
|
||||
}
|
||||
p.subscription = subs
|
||||
|
||||
logSink, err := p.createSink(projectID, nsName, topic.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
|
||||
}
|
||||
p.logSink = logSink
|
||||
|
||||
if err = p.authorizeSink(); err != nil {
|
||||
return fmt.Errorf("failed to authorize log sink: %v", err)
|
||||
}
|
||||
|
||||
if err = p.waitSinkInit(); err != nil {
|
||||
return fmt.Errorf("failed to wait for sink to become operational: %v", err)
|
||||
}
|
||||
|
||||
go p.pollLogs()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) Cleanup() {
|
||||
p.pollingStopChannel <- struct{}{}
|
||||
|
||||
if p.logSink != nil {
|
||||
projectID := framework.TestContext.CloudConfig.ProjectID
|
||||
sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, p.logSink.Name)
|
||||
sinksService := p.sdService.Projects.Sinks
|
||||
if _, err := sinksService.Delete(sinkNameID).Do(); err != nil {
|
||||
framework.Logf("Failed to delete LogSink: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.subscription != nil {
|
||||
subsService := p.pubsubService.Projects.Subscriptions
|
||||
if _, err := subsService.Delete(p.subscription.Name).Do(); err != nil {
|
||||
framework.Logf("Failed to delete PubSub subscription: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.topic != nil {
|
||||
topicsService := p.pubsubService.Projects.Topics
|
||||
if _, err := topicsService.Delete(p.topic.Name).Do(); err != nil {
|
||||
framework.Logf("Failed to delete PubSub topic: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) ReadEntries(name string) []utils.LogEntry {
|
||||
return p.queueCollection.Pop(name)
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) LoggingAgentName() string {
|
||||
return "fluentd-gcp"
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) createPubSubTopic(projectID, topicName string) (*pubsub.Topic, error) {
|
||||
topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
|
||||
topic := &pubsub.Topic{
|
||||
Name: topicFullName,
|
||||
}
|
||||
return p.pubsubService.Projects.Topics.Create(topicFullName, topic).Do()
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) createPubSubSubscription(projectID, subsName, topicName string) (*pubsub.Subscription, error) {
|
||||
subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subsName)
|
||||
subs := &pubsub.Subscription{
|
||||
Name: subsFullName,
|
||||
Topic: topicName,
|
||||
}
|
||||
return p.pubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) createSink(projectID, sinkName, topicName string) (*sd.LogSink, error) {
|
||||
filter, err := p.buildFilter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
framework.Logf("Using the following filter for log entries: %s", filter)
|
||||
sink := &sd.LogSink{
|
||||
Name: sinkName,
|
||||
Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
|
||||
Filter: filter,
|
||||
}
|
||||
projectDst := fmt.Sprintf("projects/%s", projectID)
|
||||
return p.sdService.Projects.Sinks.Create(projectDst, sink).Do()
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) buildFilter() (string, error) {
|
||||
switch p.scope {
|
||||
case podsScope:
|
||||
return fmt.Sprintf("resource.type=\"container\" AND resource.labels.namespace_id=\"%s\"",
|
||||
p.framework.Namespace.Name), nil
|
||||
case eventsScope:
|
||||
return fmt.Sprintf("resource.type=\"gke_cluster\" AND jsonPayload.metadata.namespace=\"%s\"",
|
||||
p.framework.Namespace.Name), nil
|
||||
case systemScope:
|
||||
nodeFilters := []string{}
|
||||
for _, nodeID := range utils.GetNodeIds(p.framework.ClientSet) {
|
||||
nodeFilter := fmt.Sprintf("resource.labels.instance_id=%s", nodeID)
|
||||
nodeFilters = append(nodeFilters, nodeFilter)
|
||||
}
|
||||
return fmt.Sprintf("resource.type=\"gce_instance\" AND (%s)",
|
||||
strings.Join(nodeFilters, " OR ")), nil
|
||||
}
|
||||
return "", fmt.Errorf("Unknown log provider scope: %v", p.scope)
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) authorizeSink() error {
|
||||
topicsService := p.pubsubService.Projects.Topics
|
||||
policy, err := topicsService.GetIamPolicy(p.topic.Name).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binding := &pubsub.Binding{
|
||||
Role: "roles/pubsub.publisher",
|
||||
Members: []string{p.logSink.WriterIdentity},
|
||||
}
|
||||
policy.Bindings = append(policy.Bindings, binding)
|
||||
req := &pubsub.SetIamPolicyRequest{Policy: policy}
|
||||
if _, err = topicsService.SetIamPolicy(p.topic.Name, req).Do(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) waitSinkInit() error {
|
||||
framework.Logf("Waiting for log sink to become operational")
|
||||
return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
|
||||
err := publish(p.pubsubService, p.topic, "embrace eternity")
|
||||
if err != nil {
|
||||
framework.Logf("Failed to push message to PubSub due to %v", err)
|
||||
}
|
||||
|
||||
messages, err := pullAndAck(p.pubsubService, p.subscription)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to pull messages from PubSub due to %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if len(messages) > 0 {
|
||||
framework.Logf("Sink %s is operational", p.logSink.Name)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) pollLogs() {
|
||||
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
|
||||
messages, err := pullAndAck(p.pubsubService, p.subscription)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to pull messages from PubSub due to %v", err)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
|
||||
if err != nil {
|
||||
framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
|
||||
continue
|
||||
}
|
||||
|
||||
var sdLogEntry sd.LogEntry
|
||||
if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
|
||||
framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
|
||||
continue
|
||||
}
|
||||
|
||||
name, ok := p.tryGetName(sdLogEntry)
|
||||
if !ok {
|
||||
framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
|
||||
continue
|
||||
}
|
||||
|
||||
logEntry, err := convertLogEntry(sdLogEntry)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to parse Stackdriver LogEntry: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
p.queueCollection.Push(name, logEntry)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}, p.pollingStopChannel)
|
||||
}
|
||||
|
||||
func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {
|
||||
switch sdLogEntry.Resource.Type {
|
||||
case "container":
|
||||
return sdLogEntry.Resource.Labels["pod_id"], true
|
||||
case "gke_cluster":
|
||||
return "", true
|
||||
case "gce_instance":
|
||||
return sdLogEntry.Resource.Labels["instance_id"], true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func convertLogEntry(sdLogEntry sd.LogEntry) (utils.LogEntry, error) {
|
||||
if sdLogEntry.TextPayload != "" {
|
||||
return utils.LogEntry{TextPayload: sdLogEntry.TextPayload}, nil
|
||||
}
|
||||
|
||||
bytes, err := sdLogEntry.JsonPayload.MarshalJSON()
|
||||
if err != nil {
|
||||
return utils.LogEntry{}, fmt.Errorf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
|
||||
}
|
||||
|
||||
var jsonObject map[string]interface{}
|
||||
err = json.Unmarshal(bytes, &jsonObject)
|
||||
if err != nil {
|
||||
return utils.LogEntry{},
|
||||
fmt.Errorf("Failed to deserialize jsonPayload as json object %s", string(bytes[:]))
|
||||
}
|
||||
return utils.LogEntry{JSONPayload: jsonObject}, nil
|
||||
}
|
||||
|
||||
func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
|
||||
subsService := service.Projects.Subscriptions
|
||||
req := &pubsub.PullRequest{
|
||||
ReturnImmediately: true,
|
||||
MaxMessages: maxPullLogMessages,
|
||||
}
|
||||
|
||||
resp, err := subsService.Pull(subs.Name, req).Do()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ids []string
|
||||
for _, msg := range resp.ReceivedMessages {
|
||||
ids = append(ids, msg.AckId)
|
||||
}
|
||||
if len(ids) > 0 {
|
||||
ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
|
||||
if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
|
||||
framework.Logf("Failed to ack poll: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return resp.ReceivedMessages, nil
|
||||
}
|
||||
|
||||
func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
|
||||
topicsService := service.Projects.Topics
|
||||
req := &pubsub.PublishRequest{
|
||||
Messages: []*pubsub.PubsubMessage{
|
||||
{
|
||||
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
|
||||
},
|
||||
},
|
||||
}
|
||||
_, err := topicsService.Publish(topic.Name, req).Do()
|
||||
return err
|
||||
}
|
||||
|
||||
func withLogProviderForScope(f *framework.Framework, scope logProviderScope, fun func(*sdLogProvider)) {
|
||||
p, err := newSdLogProvider(f, scope)
|
||||
framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
|
||||
|
||||
err = p.Init()
|
||||
defer p.Cleanup()
|
||||
framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
|
||||
|
||||
err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName())
|
||||
framework.ExpectNoError(err, "Logging agents deployed incorrectly")
|
||||
|
||||
fun(p)
|
||||
}
|
@ -1,320 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
api_v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/util/integer"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
// Duration of delay between any two attempts to check if all logs are ingested
|
||||
ingestionRetryDelay = 30 * time.Second
|
||||
|
||||
// Amount of requested cores for logging container in millicores
|
||||
loggingContainerCpuRequest = 10
|
||||
|
||||
// Amount of requested memory for logging container in bytes
|
||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
||||
|
||||
// Name of the container used for logging tests
|
||||
loggingContainerName = "logging-container"
|
||||
)
|
||||
|
||||
var (
|
||||
// Regexp, matching the contents of log entries, parsed or not
|
||||
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*")
|
||||
)
|
||||
|
||||
type logEntry struct {
|
||||
Payload string
|
||||
}
|
||||
|
||||
func (entry logEntry) getLogEntryNumber() (int, bool) {
|
||||
submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload)
|
||||
if submatch == nil || len(submatch) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
lineNumber, err := strconv.Atoi(submatch[1])
|
||||
return lineNumber, err == nil
|
||||
}
|
||||
|
||||
type logsProvider interface {
|
||||
Init() error
|
||||
Cleanup()
|
||||
ReadEntries(*loggingPod) []logEntry
|
||||
FluentdApplicationName() string
|
||||
}
|
||||
|
||||
type loggingTestConfig struct {
|
||||
LogsProvider logsProvider
|
||||
Pods []*loggingPod
|
||||
IngestionTimeout time.Duration
|
||||
MaxAllowedLostFraction float64
|
||||
MaxAllowedFluentdRestarts int
|
||||
}
|
||||
|
||||
// Type to track the progress of logs generating pod
|
||||
type loggingPod struct {
|
||||
// Name equals to the pod name and the container name.
|
||||
Name string
|
||||
// NodeName is the name of the node this pod will be
|
||||
// assigned to. Can be empty.
|
||||
NodeName string
|
||||
// Occurrences is a cache of ingested and read entries.
|
||||
Occurrences map[int]logEntry
|
||||
// ExpectedLinesNumber is the number of lines that are
|
||||
// expected to be ingested from this pod.
|
||||
ExpectedLinesNumber int
|
||||
// RunDuration is how long the pod will live.
|
||||
RunDuration time.Duration
|
||||
}
|
||||
|
||||
func newLoggingPod(podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
|
||||
return &loggingPod{
|
||||
Name: podName,
|
||||
NodeName: nodeName,
|
||||
Occurrences: make(map[int]logEntry),
|
||||
ExpectedLinesNumber: totalLines,
|
||||
RunDuration: loggingDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *loggingPod) Start(f *framework.Framework) {
|
||||
framework.Logf("Starting pod %s", p.Name)
|
||||
f.PodClient().Create(&api_v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: p.Name,
|
||||
},
|
||||
Spec: api_v1.PodSpec{
|
||||
RestartPolicy: api_v1.RestartPolicyNever,
|
||||
Containers: []api_v1.Container{
|
||||
{
|
||||
Name: loggingContainerName,
|
||||
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
|
||||
Env: []api_v1.EnvVar{
|
||||
{
|
||||
Name: "LOGS_GENERATOR_LINES_TOTAL",
|
||||
Value: strconv.Itoa(p.ExpectedLinesNumber),
|
||||
},
|
||||
{
|
||||
Name: "LOGS_GENERATOR_DURATION",
|
||||
Value: p.RunDuration.String(),
|
||||
},
|
||||
},
|
||||
Resources: api_v1.ResourceRequirements{
|
||||
Requests: api_v1.ResourceList{
|
||||
api_v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCpuRequest,
|
||||
resource.DecimalSI),
|
||||
api_v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: p.NodeName,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func startNewLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod {
|
||||
pod := newLoggingPod(podName, nodeName, totalLines, loggingDuration)
|
||||
pod.Start(f)
|
||||
return pod
|
||||
}
|
||||
|
||||
func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error {
|
||||
podHasIngestedLogs := make([]bool, len(config.Pods))
|
||||
podWithIngestedLogsCount := 0
|
||||
|
||||
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
for podIdx, pod := range config.Pods {
|
||||
if podHasIngestedLogs[podIdx] {
|
||||
continue
|
||||
}
|
||||
|
||||
entries := config.LogsProvider.ReadEntries(pod)
|
||||
if len(entries) == 0 {
|
||||
framework.Logf("No log entries from pod %s", pod.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if _, ok := entry.getLogEntryNumber(); ok {
|
||||
framework.Logf("Found some log entries from pod %s", pod.Name)
|
||||
podHasIngestedLogs[podIdx] = true
|
||||
podWithIngestedLogsCount++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if podWithIngestedLogsCount == len(config.Pods) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if podWithIngestedLogsCount < len(config.Pods) {
|
||||
return fmt.Errorf("some logs were ingested for %d pods out of %d", podWithIngestedLogsCount, len(config.Pods))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig) error {
|
||||
expectedLinesNumber := 0
|
||||
for _, pod := range config.Pods {
|
||||
expectedLinesNumber += pod.ExpectedLinesNumber
|
||||
}
|
||||
|
||||
totalMissing := expectedLinesNumber
|
||||
|
||||
missingByPod := make([]int, len(config.Pods))
|
||||
for podIdx, pod := range config.Pods {
|
||||
missingByPod[podIdx] = pod.ExpectedLinesNumber
|
||||
}
|
||||
|
||||
for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
missing := 0
|
||||
for podIdx, pod := range config.Pods {
|
||||
if missingByPod[podIdx] == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
missingByPod[podIdx] = pullMissingLogsCount(config.LogsProvider, pod)
|
||||
missing += missingByPod[podIdx]
|
||||
}
|
||||
|
||||
totalMissing = missing
|
||||
if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines in total", totalMissing)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
|
||||
|
||||
if totalMissing > 0 {
|
||||
framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines",
|
||||
config.IngestionTimeout, totalMissing, lostFraction*100)
|
||||
for podIdx, missing := range missingByPod {
|
||||
if missing != 0 {
|
||||
framework.Logf("Still missing %d lines for pod %s", missing, config.Pods[podIdx].Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lostFraction > config.MaxAllowedLostFraction {
|
||||
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
|
||||
lostFraction*100, config.MaxAllowedLostFraction*100)
|
||||
}
|
||||
|
||||
fluentdPods, err := getFluentdPods(f, config.LogsProvider.FluentdApplicationName())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get fluentd pods due to %v", err)
|
||||
}
|
||||
|
||||
maxRestartCount := 0
|
||||
for _, fluentdPod := range fluentdPods.Items {
|
||||
restartCount := int(fluentdPod.Status.ContainerStatuses[0].RestartCount)
|
||||
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
|
||||
|
||||
framework.Logf("Fluentd pod %s on node %s was restarted %d times",
|
||||
fluentdPod.Name, fluentdPod.Spec.NodeName, restartCount)
|
||||
}
|
||||
|
||||
if maxRestartCount > config.MaxAllowedFluentdRestarts {
|
||||
return fmt.Errorf("max fluentd pod restarts was %d, which is more than allowed %d",
|
||||
maxRestartCount, config.MaxAllowedFluentdRestarts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int {
|
||||
missingOnPod, err := getMissingLinesCount(logsProvider, pod)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
|
||||
return pod.ExpectedLinesNumber
|
||||
}
|
||||
|
||||
return missingOnPod
|
||||
}
|
||||
|
||||
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
|
||||
entries := logsProvider.ReadEntries(pod)
|
||||
|
||||
for _, entry := range entries {
|
||||
lineNumber, ok := entry.getLogEntryNumber()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if lineNumber < 0 || lineNumber >= pod.ExpectedLinesNumber {
|
||||
framework.Logf("Unexpected line number: %d", lineNumber)
|
||||
} else {
|
||||
pod.Occurrences[lineNumber] = entry
|
||||
}
|
||||
}
|
||||
|
||||
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
|
||||
}
|
||||
|
||||
func ensureSingleFluentdOnEachNode(f *framework.Framework, fluentdApplicationName string) error {
|
||||
fluentdPodList, err := getFluentdPods(f, fluentdApplicationName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fluentdPodsPerNode := make(map[string]int)
|
||||
for _, fluentdPod := range fluentdPodList.Items {
|
||||
fluentdPodsPerNode[fluentdPod.Spec.NodeName]++
|
||||
}
|
||||
|
||||
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
||||
for _, node := range nodeList.Items {
|
||||
fluentdPodCount, ok := fluentdPodsPerNode[node.Name]
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("node %s doesn't have fluentd instance", node.Name)
|
||||
} else if fluentdPodCount != 1 {
|
||||
return fmt.Errorf("node %s contains %d fluentd instaces, expected exactly one", node.Name, fluentdPodCount)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getFluentdPods(f *framework.Framework, fluentdApplicationName string) (*api_v1.PodList, error) {
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": fluentdApplicationName}))
|
||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
||||
return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||
}
|
45
test/e2e/instrumentation/logging/utils/BUILD
Normal file
45
test/e2e/instrumentation/logging/utils/BUILD
Normal file
@ -0,0 +1,45 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"log_provider.go",
|
||||
"logging_agent.go",
|
||||
"logging_pod.go",
|
||||
"misc.go",
|
||||
"types.go",
|
||||
"wait.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/integer:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
25
test/e2e/instrumentation/logging/utils/log_provider.go
Normal file
25
test/e2e/instrumentation/logging/utils/log_provider.go
Normal file
@ -0,0 +1,25 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
// LogProvider interface provides an API to get logs from the logging backend.
|
||||
type LogProvider interface {
|
||||
Init() error
|
||||
Cleanup()
|
||||
ReadEntries(name string) []LogEntry
|
||||
LoggingAgentName() string
|
||||
}
|
86
test/e2e/instrumentation/logging/utils/logging_agent.go
Normal file
86
test/e2e/instrumentation/logging/utils/logging_agent.go
Normal file
@ -0,0 +1,86 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
api_v1 "k8s.io/api/core/v1"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/util/integer"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
// EnsureLoggingAgentDeployment checks that logging agent is present on each
|
||||
// node and returns an error if that's not true.
|
||||
func EnsureLoggingAgentDeployment(f *framework.Framework, appName string) error {
|
||||
agentPods, err := getLoggingAgentPods(f, appName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
||||
}
|
||||
|
||||
agentPerNode := make(map[string]int)
|
||||
for _, pod := range agentPods.Items {
|
||||
agentPerNode[pod.Spec.NodeName]++
|
||||
}
|
||||
|
||||
nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
||||
for _, node := range nodeList.Items {
|
||||
agentPodsCount, ok := agentPerNode[node.Name]
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("node %s doesn't have logging agents, want 1", node.Name)
|
||||
} else if agentPodsCount != 1 {
|
||||
return fmt.Errorf("node %s has %d logging agents, want 1", node.Name, agentPodsCount)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureLoggingAgentRestartsCount checks that each logging agent was restarted
|
||||
// no more than maxRestarts times and returns an error if there's a pod which
|
||||
// exceeds this number of restarts.
|
||||
func EnsureLoggingAgentRestartsCount(f *framework.Framework, appName string, maxRestarts int) error {
|
||||
agentPods, err := getLoggingAgentPods(f, appName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get logging agent pods: %v", err)
|
||||
}
|
||||
|
||||
maxRestartCount := 0
|
||||
for _, pod := range agentPods.Items {
|
||||
restartCount := int(pod.Status.ContainerStatuses[0].RestartCount)
|
||||
maxRestartCount = integer.IntMax(maxRestartCount, restartCount)
|
||||
|
||||
framework.Logf("Logging agent %s on node %s was restarted %d times",
|
||||
pod.Name, pod.Spec.NodeName, restartCount)
|
||||
}
|
||||
|
||||
if maxRestartCount > maxRestarts {
|
||||
return fmt.Errorf("max logging agent restarts was %d, which is more than allowed %d",
|
||||
maxRestartCount, maxRestarts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getLoggingAgentPods(f *framework.Framework, appName string) (*api_v1.PodList, error) {
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": appName}))
|
||||
options := meta_v1.ListOptions{LabelSelector: label.String()}
|
||||
return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
|
||||
}
|
188
test/e2e/instrumentation/logging/utils/logging_pod.go
Normal file
188
test/e2e/instrumentation/logging/utils/logging_pod.go
Normal file
@ -0,0 +1,188 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
api_v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
// Amount of requested cores for logging container in millicores
|
||||
loggingContainerCPURequest = 10
|
||||
|
||||
// Amount of requested memory for logging container in bytes
|
||||
loggingContainerMemoryRequest = 10 * 1024 * 1024
|
||||
|
||||
// Name of the container used for logging tests
|
||||
loggingContainerName = "logging-container"
|
||||
)
|
||||
|
||||
// LoggingPod is an interface of a pod that can be started and that logs
|
||||
// something to its stdout, possibly indefinitely.
|
||||
type LoggingPod interface {
|
||||
// Name equals to the Kubernetes pod name.
|
||||
Name() string
|
||||
|
||||
// Start method controls when the logging pod is started in the cluster.
|
||||
Start(f *framework.Framework) error
|
||||
}
|
||||
|
||||
// StartAndReturnSelf is a helper method to start a logging pod and
|
||||
// immediately return it.
|
||||
func StartAndReturnSelf(p LoggingPod, f *framework.Framework) (LoggingPod, error) {
|
||||
err := p.Start(f)
|
||||
return p, err
|
||||
}
|
||||
|
||||
// FiniteLoggingPod is a logging pod that emits a known number of log lines.
|
||||
type FiniteLoggingPod interface {
|
||||
LoggingPod
|
||||
|
||||
// ExpectedLinesNumber returns the number of lines that are
|
||||
// expected to be ingested from this pod.
|
||||
ExpectedLineCount() int
|
||||
}
|
||||
|
||||
var _ FiniteLoggingPod = &loadLoggingPod{}
|
||||
|
||||
type loadLoggingPod struct {
|
||||
name string
|
||||
nodeName string
|
||||
expectedLinesCount int
|
||||
runDuration time.Duration
|
||||
}
|
||||
|
||||
// NewLoadLoggingPod returns a logging pod that generates totalLines random
|
||||
// lines over period of length loggingDuration. Lines generated by this
|
||||
// pod are numbered and have well-defined structure.
|
||||
func NewLoadLoggingPod(podName string, nodeName string, totalLines int,
|
||||
loggingDuration time.Duration) FiniteLoggingPod {
|
||||
return &loadLoggingPod{
|
||||
name: podName,
|
||||
nodeName: nodeName,
|
||||
expectedLinesCount: totalLines,
|
||||
runDuration: loggingDuration,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) Start(f *framework.Framework) error {
|
||||
framework.Logf("Starting load logging pod %s", p.name)
|
||||
f.PodClient().Create(&api_v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: p.name,
|
||||
},
|
||||
Spec: api_v1.PodSpec{
|
||||
RestartPolicy: api_v1.RestartPolicyNever,
|
||||
Containers: []api_v1.Container{
|
||||
{
|
||||
Name: loggingContainerName,
|
||||
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
|
||||
Env: []api_v1.EnvVar{
|
||||
{
|
||||
Name: "LOGS_GENERATOR_LINES_TOTAL",
|
||||
Value: strconv.Itoa(p.expectedLinesCount),
|
||||
},
|
||||
{
|
||||
Name: "LOGS_GENERATOR_DURATION",
|
||||
Value: p.runDuration.String(),
|
||||
},
|
||||
},
|
||||
Resources: api_v1.ResourceRequirements{
|
||||
Requests: api_v1.ResourceList{
|
||||
api_v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCPURequest,
|
||||
resource.DecimalSI),
|
||||
api_v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeName: p.nodeName,
|
||||
},
|
||||
})
|
||||
return framework.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
||||
}
|
||||
|
||||
func (p *loadLoggingPod) ExpectedLineCount() int {
|
||||
return p.expectedLinesCount
|
||||
}
|
||||
|
||||
var _ LoggingPod = &repeatingLoggingPod{}
|
||||
|
||||
type repeatingLoggingPod struct {
|
||||
name string
|
||||
line string
|
||||
}
|
||||
|
||||
// NewRepeatingLoggingPod returns a logging pod that each second prints
|
||||
// line value to its stdout.
|
||||
func NewRepeatingLoggingPod(podName string, line string) LoggingPod {
|
||||
return &repeatingLoggingPod{
|
||||
name: podName,
|
||||
line: line,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *repeatingLoggingPod) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *repeatingLoggingPod) Start(f *framework.Framework) error {
|
||||
framework.Logf("Starting repeating logging pod %s", p.name)
|
||||
f.PodClient().Create(&api_v1.Pod{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: p.name,
|
||||
},
|
||||
Spec: api_v1.PodSpec{
|
||||
Containers: []api_v1.Container{
|
||||
{
|
||||
Name: loggingContainerName,
|
||||
Image: "busybox",
|
||||
Command: []string{
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", p.line),
|
||||
},
|
||||
Resources: api_v1.ResourceRequirements{
|
||||
Requests: api_v1.ResourceList{
|
||||
api_v1.ResourceCPU: *resource.NewMilliQuantity(
|
||||
loggingContainerCPURequest,
|
||||
resource.DecimalSI),
|
||||
api_v1.ResourceMemory: *resource.NewQuantity(
|
||||
loggingContainerMemoryRequest,
|
||||
resource.BinarySI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
return framework.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name)
|
||||
}
|
32
test/e2e/instrumentation/logging/utils/misc.go
Normal file
32
test/e2e/instrumentation/logging/utils/misc.go
Normal file
@ -0,0 +1,32 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
// GetNodeIds returns the list of node names and panics in case of failure.
|
||||
func GetNodeIds(cs clientset.Interface) []string {
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(cs)
|
||||
nodeIds := []string{}
|
||||
for _, n := range nodes.Items {
|
||||
nodeIds = append(nodeIds, n.Spec.ExternalID)
|
||||
}
|
||||
return nodeIds
|
||||
}
|
106
test/e2e/instrumentation/logging/utils/types.go
Normal file
106
test/e2e/instrumentation/logging/utils/types.go
Normal file
@ -0,0 +1,106 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
// Regexp, matching the contents of log entries, parsed or not
|
||||
logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*")
|
||||
)
|
||||
|
||||
// LogEntry represents a log entry, received from the logging backend.
|
||||
type LogEntry struct {
|
||||
TextPayload string
|
||||
JSONPayload map[string]interface{}
|
||||
}
|
||||
|
||||
// TryGetEntryNumber returns the number of the log entry in sequence, if it
|
||||
// was generated by the load logging pod (requires special log format).
|
||||
func (entry LogEntry) TryGetEntryNumber() (int, bool) {
|
||||
submatch := logEntryMessageRegex.FindStringSubmatch(entry.TextPayload)
|
||||
if submatch == nil || len(submatch) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
lineNumber, err := strconv.Atoi(submatch[1])
|
||||
return lineNumber, err == nil
|
||||
}
|
||||
|
||||
// LogsQueueCollection is a thread-safe set of named log queues.
|
||||
type LogsQueueCollection interface {
|
||||
Push(name string, logs ...LogEntry)
|
||||
Pop(name string) []LogEntry
|
||||
}
|
||||
|
||||
var _ LogsQueueCollection = &logsQueueCollection{}
|
||||
|
||||
type logsQueueCollection struct {
|
||||
mutex *sync.Mutex
|
||||
queues map[string]chan LogEntry
|
||||
queueSize int
|
||||
}
|
||||
|
||||
// NewLogsQueueCollection returns a new LogsQueueCollection where each queue
|
||||
// is created with a default size of queueSize.
|
||||
func NewLogsQueueCollection(queueSize int) LogsQueueCollection {
|
||||
return &logsQueueCollection{
|
||||
mutex: &sync.Mutex{},
|
||||
queues: map[string]chan LogEntry{},
|
||||
queueSize: queueSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) Push(name string, logs ...LogEntry) {
|
||||
q := c.getQueue(name)
|
||||
for _, log := range logs {
|
||||
q <- log
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) Pop(name string) []LogEntry {
|
||||
q := c.getQueue(name)
|
||||
var entries []LogEntry
|
||||
polling_loop:
|
||||
for {
|
||||
select {
|
||||
case entry := <-q:
|
||||
entries = append(entries, entry)
|
||||
default:
|
||||
break polling_loop
|
||||
}
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func (c *logsQueueCollection) getQueue(name string) chan LogEntry {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
if q, ok := c.queues[name]; ok {
|
||||
return q
|
||||
}
|
||||
|
||||
newQ := make(chan LogEntry, c.queueSize)
|
||||
c.queues[name] = newQ
|
||||
return newQ
|
||||
}
|
204
test/e2e/instrumentation/logging/utils/wait.go
Normal file
204
test/e2e/instrumentation/logging/utils/wait.go
Normal file
@ -0,0 +1,204 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
// LogChecker is an interface for an entity that can check whether logging
|
||||
// backend contains all wanted log entries.
|
||||
type LogChecker interface {
|
||||
EntriesIngested() (bool, error)
|
||||
Timeout() error
|
||||
}
|
||||
|
||||
// IngestionPred is a type of a function that checks whether all required
|
||||
// log entries were ingested.
|
||||
type IngestionPred func(string, []LogEntry) (bool, error)
|
||||
|
||||
// UntilFirstEntry is a IngestionPred that checks that at least one entry was
|
||||
// ingested.
|
||||
var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) {
|
||||
return len(entries) > 0, nil
|
||||
}
|
||||
|
||||
// TimeoutFun is a function that is called when the waiting times out.
|
||||
type TimeoutFun func([]string, []bool) error
|
||||
|
||||
// JustTimeout returns the error with the list of names for which backend is
|
||||
// still still missing logs.
|
||||
var JustTimeout TimeoutFun = func(names []string, ingested []bool) error {
|
||||
failedNames := []string{}
|
||||
for i, name := range names {
|
||||
if !ingested[i] {
|
||||
failedNames = append(failedNames, name)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s",
|
||||
strings.Join(failedNames, ","))
|
||||
}
|
||||
|
||||
var _ LogChecker = &logChecker{}
|
||||
|
||||
type logChecker struct {
|
||||
provider LogProvider
|
||||
names []string
|
||||
ingested []bool
|
||||
ingestionPred IngestionPred
|
||||
timeoutFun TimeoutFun
|
||||
}
|
||||
|
||||
// NewLogChecker constructs a LogChecker for a list of names from custom
|
||||
// IngestionPred and TimeoutFun.
|
||||
func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker {
|
||||
return &logChecker{
|
||||
provider: p,
|
||||
names: names,
|
||||
ingested: make([]bool, len(names)),
|
||||
ingestionPred: pred,
|
||||
timeoutFun: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *logChecker) EntriesIngested() (bool, error) {
|
||||
allIngested := true
|
||||
for i, name := range c.names {
|
||||
if c.ingested[i] {
|
||||
continue
|
||||
}
|
||||
entries := c.provider.ReadEntries(name)
|
||||
ingested, err := c.ingestionPred(name, entries)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if ingested {
|
||||
c.ingested[i] = true
|
||||
}
|
||||
allIngested = allIngested && ingested
|
||||
}
|
||||
return allIngested, nil
|
||||
}
|
||||
|
||||
func (c *logChecker) Timeout() error {
|
||||
return c.timeoutFun(c.names, c.ingested)
|
||||
}
|
||||
|
||||
// NumberedIngestionPred is a IngestionPred that takes into account sequential
|
||||
// numbers of ingested entries.
|
||||
type NumberedIngestionPred func(string, map[int]bool) (bool, error)
|
||||
|
||||
// NumberedTimeoutFun is a TimeoutFun that takes into account sequential
|
||||
// numbers of ingested entries.
|
||||
type NumberedTimeoutFun func([]string, map[string]map[int]bool) error
|
||||
|
||||
// NewNumberedLogChecker returns a log checker that works with numbered log
|
||||
// entries generated by load logging pods.
|
||||
func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred,
|
||||
timeout NumberedTimeoutFun, names ...string) LogChecker {
|
||||
occs := map[string]map[int]bool{}
|
||||
return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) {
|
||||
occ, ok := occs[name]
|
||||
if !ok {
|
||||
occ = map[int]bool{}
|
||||
occs[name] = occ
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if no, ok := entry.TryGetEntryNumber(); ok {
|
||||
occ[no] = true
|
||||
}
|
||||
}
|
||||
return pred(name, occ)
|
||||
}, func(names []string, _ []bool) error {
|
||||
return timeout(names, occs)
|
||||
}, names...)
|
||||
}
|
||||
|
||||
// NewFullIngestionPodLogChecker returns a log checks that works with numbered
|
||||
// log entries generated by load logging pods and waits until all entries are
|
||||
// ingested. If timeout is reached, fraction is lost logs up to slack is
|
||||
// considered tolerable.
|
||||
func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker {
|
||||
podsMap := map[string]FiniteLoggingPod{}
|
||||
for _, p := range pods {
|
||||
podsMap[p.Name()] = p
|
||||
}
|
||||
return NewNumberedLogChecker(p, getFullIngestionPred(podsMap),
|
||||
getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...)
|
||||
}
|
||||
|
||||
func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred {
|
||||
return func(name string, occ map[int]bool) (bool, error) {
|
||||
p := podsMap[name]
|
||||
ok := len(occ) == p.ExpectedLineCount()
|
||||
if !ok {
|
||||
framework.Logf("Pod %s is still missing %d lines", name, p.ExpectedLineCount()-len(occ))
|
||||
}
|
||||
return ok, nil
|
||||
}
|
||||
}
|
||||
|
||||
func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun {
|
||||
return func(names []string, occs map[string]map[int]bool) error {
|
||||
totalGot, totalWant := 0, 0
|
||||
podsWithLosses := []string{}
|
||||
for _, name := range names {
|
||||
got := len(occs[name])
|
||||
want := podsMap[name].ExpectedLineCount()
|
||||
if got != want {
|
||||
podsWithLosses = append(podsWithLosses, name)
|
||||
}
|
||||
totalGot += got
|
||||
totalWant += want
|
||||
}
|
||||
if len(podsWithLosses) > 0 {
|
||||
framework.Logf("Still missing logs from: %s", strings.Join(podsWithLosses, ", "))
|
||||
}
|
||||
lostFrac := 1 - float64(totalGot)/float64(totalWant)
|
||||
if lostFrac > slack {
|
||||
return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable",
|
||||
lostFrac*100, slack*100)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForLogs checks that logs are ingested, as reported by the log checker
|
||||
// until the timeout has passed. Function sleeps for interval between two
|
||||
// log ingestion checks.
|
||||
func WaitForLogs(c LogChecker, interval, timeout time.Duration) error {
|
||||
err := wait.Poll(interval, timeout, func() (bool, error) {
|
||||
return c.EntriesIngested()
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return c.Timeout()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string {
|
||||
names := []string{}
|
||||
for _, p := range pods {
|
||||
names = append(names, p.Name())
|
||||
}
|
||||
return names
|
||||
}
|
@ -18,7 +18,7 @@ go_library(
|
||||
deps = [
|
||||
"//test/e2e/common:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/instrumentation:go_default_library",
|
||||
"//test/e2e/instrumentation/common:go_default_library",
|
||||
"//vendor/github.com/influxdata/influxdb/client/v2:go_default_library",
|
||||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
)
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/test/e2e/common"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/instrumentation"
|
||||
instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
|
||||
|
||||
gcm "google.golang.org/api/monitoring/v3"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user