diff --git a/test/integration/logs/benchmark/.gitignore b/test/integration/logs/benchmark/.gitignore new file mode 100644 index 00000000000..16ae08cff0b --- /dev/null +++ b/test/integration/logs/benchmark/.gitignore @@ -0,0 +1,10 @@ +# Created by get-logs.sh: +/ci-kubernetes-kind-e2e-json-logging +/data/kind-worker-kubelet.log +/data/kube-apiserver.log +/data/kube-controller-manager.log +/data/kube-scheduler.log +/data/v3/kind-worker-kubelet.log +/data/v3/kube-apiserver.log +/data/v3/kube-controller-manager.log +/data/v3/kube-scheduler.log diff --git a/test/integration/logs/benchmark/README.md b/test/integration/logs/benchmark/README.md index d68a9a8deda..44fd0bb53f9 100644 --- a/test/integration/logs/benchmark/README.md +++ b/test/integration/logs/benchmark/README.md @@ -6,7 +6,7 @@ must be benchmarked before and after the change. ## Running the benchmark ``` -$ go test -bench=. -test.benchmem -benchmem . +go test -v -bench=. -benchmem -benchtime=10s . ``` ## Real log data @@ -28,9 +28,29 @@ Prow job: - `artifacts/logs/kind-control-plane/containers` - `artifacts/logs/kind-*/kubelet.log` -With sufficient credentials, `gsutil` can be used to download everything for a job with: +With sufficient credentials, `gsutil` can be used to download everything for a job directly +into a directory that then will be used by the benchmarks automatically: + ``` -gsutil -m cp -R gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/ . +kubernetes$ test/integration/logs/benchmark/get-logs.sh +++ dirname test/integration/logs/benchmark/get-logs.sh ++ cd test/integration/logs/benchmark +++ latest_job +++ gsutil cat gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/latest-build.txt ++ job=1618864842834186240 ++ rm -rf ci-kubernetes-kind-e2e-json-logging ++ mkdir ci-kubernetes-kind-e2e-json-logging +... +``` + +This sets up the `data` directory so that additional test cases are available +(`BenchmarkEncoding/v3/kind-worker-kubelet/`, +`BenchmarkEncoding/kube-scheduler/`, etc.). + + +To clean up, use +``` +git clean -fx test/integration/logs/benchmark ``` ## Analyzing log data @@ -39,5 +59,5 @@ While loading a file, some statistics about it are collected. Those are shown when running with: ``` -$ go test -v -bench=. -test.benchmem -benchmem . +go test -v -bench=BenchmarkEncoding/none -run=none . ``` diff --git a/test/integration/logs/benchmark/benchmark_test.go b/test/integration/logs/benchmark/benchmark_test.go index 6c071ce38cd..38249c648ef 100644 --- a/test/integration/logs/benchmark/benchmark_test.go +++ b/test/integration/logs/benchmark/benchmark_test.go @@ -18,12 +18,10 @@ package benchmark import ( "errors" - "flag" "fmt" "io" "io/fs" "os" - "path" "path/filepath" "regexp" "strconv" @@ -32,16 +30,18 @@ import ( "testing" "time" + "k8s.io/component-base/featuregate" logsapi "k8s.io/component-base/logs/api/v1" - logsjson "k8s.io/component-base/logs/json" + _ "k8s.io/component-base/logs/json/register" "k8s.io/klog/v2" ) func BenchmarkEncoding(b *testing.B) { + seen := map[string]bool{} + // Each "data/(v[0-9]/)?*.log" file is expected to contain JSON log // messages. We generate one sub-benchmark for each file where logging - // is tested with the log level from the directory. Symlinks can be - // used to test the same file with different levels. + // is tested with the log level from the directory. if err := filepath.Walk("data", func(path string, info fs.FileInfo, err error) error { if err != nil { return err @@ -53,8 +53,14 @@ func BenchmarkEncoding(b *testing.B) { if err != nil { return err } - if info.Mode()&fs.ModeSymlink == 0 { - b.Log(path + "\n" + stats.String()) + // Only print unique file statistics. They get shown for the + // first file where new statistics are encountered. The + // assumption here is that the there are no files with + // different content and exactly the same statistics. + statsStr := stats.String() + if !seen[statsStr] { + b.Log(path + "\n" + statsStr) + seen[statsStr] = true } b.Run(strings.TrimSuffix(strings.TrimPrefix(path, "data/"), ".log"), func(b *testing.B) { // Take verbosity threshold from directory, if present. @@ -63,52 +69,57 @@ func BenchmarkEncoding(b *testing.B) { if vMatch != nil { v, _ = strconv.Atoi(vMatch[1]) } + fileSizes := map[string]int{} - b.Run("stats", func(b *testing.B) { - // Nothing to do. Use this for "go test -v - // -bench=BenchmarkLogging/.*/stats" to print - // just the statistics. - }) - b.Run("printf", func(b *testing.B) { + test := func(b *testing.B, format string, print func(logger klog.Logger, item logMessage)) { + state := klog.CaptureState() + defer state.Restore() + + var output bytesWritten + c := logsapi.NewLoggingConfiguration() + c.Format = format + o := logsapi.LoggingOptions{ + ErrorStream: &output, + InfoStream: &output, + } + klog.SetOutput(&output) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil { + b.Fatalf("Unexpected error configuring logging: %v", err) + } + logger := klog.Background() b.ResetTimer() - output = 0 + start := time.Now() + total := int64(0) for i := 0; i < b.N; i++ { for _, item := range messages { if item.verbosity <= v { - printf(item) + print(logger, item) + total++ } } } - fileSizes["printf"] = int(output) / b.N + end := time.Now() + duration := end.Sub(start) + + // Report messages/s instead of ns/op because "op" varies. + b.ReportMetric(0, "ns/op") + b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s") + fileSizes[filepath.Base(b.Name())] = int(output) + } + + b.Run("printf", func(b *testing.B) { + test(b, "text", func(_ klog.Logger, item logMessage) { + printf(item) + }) }) b.Run("structured", func(b *testing.B) { - b.ResetTimer() - output = 0 - for i := 0; i < b.N; i++ { - for _, item := range messages { - if item.verbosity <= v { - prints(item) - } - } - } - fileSizes["structured"] = int(output) / b.N + test(b, "text", prints) }) b.Run("JSON", func(b *testing.B) { - klog.SetLogger(jsonLogger) - defer klog.ClearLogger() - b.ResetTimer() - output = 0 - for i := 0; i < b.N; i++ { - for _, item := range messages { - if item.verbosity <= v { - prints(item) - } - } - } - fileSizes["JSON"] = int(output) / b.N + test(b, "json", prints) }) - b.Log(fmt.Sprintf("file sizes: %v\n", fileSizes)) + b.Log(fmt.Sprintf("%s: file sizes: %v\n", path, fileSizes)) }) return nil }); err != nil { @@ -135,9 +146,6 @@ type loadGeneratorConfig struct { // See https://github.com/kubernetes/kubernetes/issues/107029 for the // motivation. func BenchmarkWriting(b *testing.B) { - flag.Set("skip_headers", "false") - defer flag.Set("skip_headers", "true") - // This could be made configurable and/or we could benchmark different // configurations automatically. config := loadGeneratorConfig{ @@ -159,70 +167,92 @@ func benchmarkWriting(b *testing.B, config loadGeneratorConfig) { } func benchmarkOutputFormats(b *testing.B, config loadGeneratorConfig, discard bool) { - tmpDir := b.TempDir() b.Run("structured", func(b *testing.B) { - var out *os.File - if !discard { - var err error - out, err = os.Create(path.Join(tmpDir, "all.log")) - if err != nil { - b.Fatal(err) - } - klog.SetOutput(out) - defer klog.SetOutput(&output) - } - generateOutput(b, config, nil, out) + benchmarkOutputFormat(b, config, discard, "text") }) b.Run("JSON", func(b *testing.B) { - var out1, out2 *os.File - if !discard { - var err error - out1, err = os.Create(path.Join(tmpDir, "stream-1.log")) - if err != nil { - b.Fatal(err) - } - defer out1.Close() - out2, err = os.Create(path.Join(tmpDir, "stream-2.log")) - if err != nil { - b.Fatal(err) - } - defer out2.Close() - } - o := logsapi.LoggingOptions{} - if discard { - o.ErrorStream = io.Discard - o.InfoStream = io.Discard - } else { - o.ErrorStream = out1 - o.InfoStream = out1 - } - - b.Run("single-stream", func(b *testing.B) { - c := logsapi.NewLoggingConfiguration() - logger, control := logsjson.Factory{}.Create(*c, o) - klog.SetLogger(logger) - defer klog.ClearLogger() - generateOutput(b, config, control.Flush, out1) - }) - - b.Run("split-stream", func(b *testing.B) { - c := logsapi.NewLoggingConfiguration() - c.Options.JSON.SplitStream = true - logger, control := logsjson.Factory{}.Create(*c, o) - klog.SetLogger(logger) - defer klog.ClearLogger() - generateOutput(b, config, control.Flush, out1, out2) - }) + benchmarkOutputFormat(b, config, discard, "json") }) } -func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), files ...*os.File) { +func benchmarkOutputFormat(b *testing.B, config loadGeneratorConfig, discard bool, format string) { + b.Run("single-stream", func(b *testing.B) { + benchmarkOutputFormatStream(b, config, discard, format, false) + }) + b.Run("split-stream", func(b *testing.B) { + benchmarkOutputFormatStream(b, config, discard, format, true) + }) +} + +func benchmarkOutputFormatStream(b *testing.B, config loadGeneratorConfig, discard bool, format string, splitStreams bool) { + tmpDir := b.TempDir() + state := klog.CaptureState() + defer state.Restore() + + featureGate := featuregate.NewFeatureGate() + logsapi.AddFeatureGates(featureGate) + if err := featureGate.SetFromMap(map[string]bool{ + string(logsapi.LoggingAlphaOptions): true, + string(logsapi.LoggingBetaOptions): true, + }); err != nil { + b.Fatalf("Set feature gates: %v", err) + } + + // Create a logging configuration using the exact same code as a normal + // component. In order to redirect output, we provide a LoggingOptions + // instance. + var o logsapi.LoggingOptions + c := logsapi.NewLoggingConfiguration() + c.Format = format + if splitStreams { + c.Options.JSON.SplitStream = true + if err := c.Options.JSON.InfoBufferSize.Set("64Ki"); err != nil { + b.Fatalf("Error setting buffer size: %v", err) + } + } + var files []*os.File + if discard { + o.ErrorStream = io.Discard + o.InfoStream = io.Discard + } else { + out1, err := os.Create(filepath.Join(tmpDir, "stream-1.log")) + if err != nil { + b.Fatal(err) + } + defer out1.Close() + out2, err := os.Create(filepath.Join(tmpDir, "stream-2.log")) + if err != nil { + b.Fatal(err) + } + defer out2.Close() + + if splitStreams { + files = append(files, out1, out2) + o.ErrorStream = out1 + o.InfoStream = out2 + } else { + files = append(files, out1) + o.ErrorStream = out1 + o.InfoStream = out1 + } + } + + klog.SetOutput(o.ErrorStream) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, featureGate); err != nil { + b.Fatalf("Unexpected error configuring logging: %v", err) + } + + generateOutput(b, config, files...) +} + +func generateOutput(b *testing.B, config loadGeneratorConfig, files ...*os.File) { msg := strings.Repeat("X", config.messageLength) err := errors.New("fail") start := time.Now() // Scale by 1000 because "go test -bench" starts with b.N == 1, which is very low. n := b.N * 1000 + total := config.workers * n b.ResetTimer() var wg sync.WaitGroup @@ -245,15 +275,15 @@ func generateOutput(b *testing.B, config loadGeneratorConfig, flush func(), file } wg.Wait() klog.Flush() - if flush != nil { - flush() - } b.StopTimer() - - // Print some information about the result. end := time.Now() duration := end.Sub(start) - total := n * config.workers + + // Report messages/s instead of ns/op because "op" varies. + b.ReportMetric(0, "ns/op") + b.ReportMetric(float64(total)/duration.Seconds(), "msgs/s") + + // Print some information about the result. b.Logf("Wrote %d log entries in %s -> %.1f/s", total, duration, float64(total)/duration.Seconds()) for i, file := range files { if file != nil { diff --git a/test/integration/logs/benchmark/common_test.go b/test/integration/logs/benchmark/common_test.go index 0b6fae600f3..76d851a44ce 100644 --- a/test/integration/logs/benchmark/common_test.go +++ b/test/integration/logs/benchmark/common_test.go @@ -18,28 +18,20 @@ package benchmark import ( "flag" - "io" - "github.com/go-logr/logr" - "go.uber.org/zap/zapcore" - - logsapi "k8s.io/component-base/logs/api/v1" - logsjson "k8s.io/component-base/logs/json" "k8s.io/klog/v2" ) func init() { - // Cause all klog output to be discarded with minimal overhead. - // We don't include time stamps and caller information. - // Individual tests can change that by calling flag.Set again, - // but should always restore this state here. + // hack/make-rules/test-integration.sh expects that all unit tests + // support -v and -vmodule. klog.InitFlags(nil) + + // Write all output into a single file. flag.Set("alsologtostderr", "false") flag.Set("logtostderr", "false") - flag.Set("skip_headers", "true") flag.Set("one_output", "true") flag.Set("stderrthreshold", "FATAL") - klog.SetOutput(&output) } type bytesWritten int64 @@ -50,22 +42,6 @@ func (b *bytesWritten) Write(data []byte) (int, error) { return l, nil } -func (b *bytesWritten) Sync() error { - return nil -} - -var output bytesWritten -var jsonLogger = newJSONLogger(&output) - -func newJSONLogger(out io.Writer) logr.Logger { - encoderConfig := &zapcore.EncoderConfig{ - MessageKey: "msg", - } - c := logsapi.NewLoggingConfiguration() - logger, _ := logsjson.NewJSONLogger(c.Verbosity, zapcore.AddSync(out), nil, encoderConfig) - return logger -} - func printf(item logMessage) { if item.isError { klog.Errorf("%s: %v %s", item.msg, item.err, item.kvs) @@ -74,17 +50,14 @@ func printf(item logMessage) { } } -// These variables are a workaround for logcheck complaining about the dynamic -// parameters. -var ( - errorS = klog.ErrorS - infoS = klog.InfoS -) - -func prints(item logMessage) { +func prints(logger klog.Logger, item logMessage) { if item.isError { - errorS(item.err, item.msg, item.kvs...) + logger.Error(item.err, item.msg, item.kvs...) // nolint: logcheck } else { - infoS(item.msg, item.kvs...) + logger.Info(item.msg, item.kvs...) // nolint: logcheck } } + +func printLogger(item logMessage) { + prints(klog.Background(), item) +} diff --git a/test/integration/logs/benchmark/data/container.log b/test/integration/logs/benchmark/data/container.log deleted file mode 100644 index 27ea85e898d..00000000000 --- a/test/integration/logs/benchmark/data/container.log +++ /dev/null @@ -1,2 +0,0 @@ -# This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one. -Nov 19 02:13:48 kind-worker2 kubelet[250]: {"ts":1637288028968.0125,"caller":"kuberuntime/kuberuntime_manager.go:902","msg":"Creating container in pod","v":0,"container":{"Name":"terminate-cmd-rpn","Image":"registry.k8s.io/e2e-test-images/busybox:1.29-2","Command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"TerminationMessagePath":"/dev/termination-log"}} diff --git a/test/integration/logs/benchmark/data/split.log b/test/integration/logs/benchmark/data/split.log new file mode 100644 index 00000000000..1ca6885952f --- /dev/null +++ b/test/integration/logs/benchmark/data/split.log @@ -0,0 +1,3 @@ +{"v": 0, +"msg": "Pod status updated"} +{"v": 0, "msg": "Pod status updated again"} diff --git a/test/integration/logs/benchmark/data/versionresponse.log b/test/integration/logs/benchmark/data/versionresponse.log new file mode 100644 index 00000000000..68339c5b56b --- /dev/null +++ b/test/integration/logs/benchmark/data/versionresponse.log @@ -0,0 +1 @@ +{"ts":1678174403964.6985,"caller":"remote/remote_runtime.go:147","msg":"[RemoteRuntimeService] Version Response","v":10,"apiVersion":{"version":"0.1.0","runtime_name":"containerd","runtime_version":"v1.6.18","runtime_api_version":"v1"}} diff --git a/test/integration/logs/benchmark/get-logs.sh b/test/integration/logs/benchmark/get-logs.sh new file mode 100755 index 00000000000..ec6c823a568 --- /dev/null +++ b/test/integration/logs/benchmark/get-logs.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash + +# Copyright 2018 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Usage: get-logs.sh [] +# +# Downloads the latest job output or the one with the specified ID +# and prepares running benchmarks for it. + +set -o pipefail +set -o errexit +set -x + +cd "$(dirname "$0")" + +latest_job () { + gsutil cat gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/latest-build.txt +} + +job=${1:-$(latest_job)} + +rm -rf ci-kubernetes-kind-e2e-json-logging +mkdir ci-kubernetes-kind-e2e-json-logging +gsutil -m cp -R "gs://kubernetes-jenkins/logs/ci-kubernetes-kind-e2e-json-logging/${job}/*" ci-kubernetes-kind-e2e-json-logging/ + +for i in kube-apiserver kube-controller-manager kube-scheduler; do + # Before (container runtime log dump (?)): + # 2023-03-07T07:30:52.193301924Z stderr F {"ts":1678174252192.0676,"caller":"scheduler/schedule_one.go:81","msg":"Attempting to schedule pod","v":3,"pod":{"name":"simpletest.rc-zgd47","namespace":"gc-5422"}} + # After: + # {"ts":1678174252192.0676,"caller":"scheduler/schedule_one.go:81","msg":"Attempting to schedule pod","v":3,"pod":{"name":"simpletest.rc-zgd47","namespace":"gc-5422"}} + sed -e 's/^20[^ ]* stderr . //' \ + ci-kubernetes-kind-e2e-json-logging/artifacts/kind-control-plane/containers/$i-*.log \ + > ci-kubernetes-kind-e2e-json-logging/$i.log; +done + +# Before (systemd format): +# Mar 07 07:22:05 kind-control-plane kubelet[288]: {"ts":1678173725722.4487,"caller":"flag/flags.go:64","msg":"FLAG: --address=\"0.0.0.0\"\n","v":1} +# After: +# {"ts":1678173725722.4487,"caller":"flag/flags.go:64","msg":"FLAG: --address=\"0.0.0.0\"\n","v":1} +grep 'kind-worker kubelet' ci-kubernetes-kind-e2e-json-logging/artifacts/kind-worker/kubelet.log | \ + sed -e 's;^.* kind-worker kubelet[^ ]*: ;;' > ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log + +# Create copies of the actual files, whether they already exist or not. To +# clean up disk space, use "git clean -fx test/integration/logs/benchmark". +copy () { + from="$1" + to="$2" + + mkdir -p "$(dirname "$to")" + rm -f "$to" + cp "$from" "$to" +} +copy ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log data/kind-worker-kubelet.log +copy ci-kubernetes-kind-e2e-json-logging/kube-apiserver.log data/kube-apiserver.log +copy ci-kubernetes-kind-e2e-json-logging/kube-controller-manager.log data/kube-controller-manager.log +copy ci-kubernetes-kind-e2e-json-logging/kube-scheduler.log data/kube-scheduler.log + +copy ci-kubernetes-kind-e2e-json-logging/kind-worker-kubelet.log data/v3/kind-worker-kubelet.log +copy ci-kubernetes-kind-e2e-json-logging/kube-apiserver.log data/v3/kube-apiserver.log +copy ci-kubernetes-kind-e2e-json-logging/kube-controller-manager.log data/v3/kube-controller-manager.log +copy ci-kubernetes-kind-e2e-json-logging/kube-scheduler.log data/v3/kube-scheduler.log diff --git a/test/integration/logs/benchmark/load.go b/test/integration/logs/benchmark/load.go index ccb5727b146..fea2f42ca96 100644 --- a/test/integration/logs/benchmark/load.go +++ b/test/integration/logs/benchmark/load.go @@ -29,7 +29,8 @@ import ( "strings" "text/template" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + runtimev1 "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" ) @@ -52,7 +53,7 @@ const ( ) type logStats struct { - TotalLines, JsonLines, ErrorMessages int + TotalLines, JsonLines, SplitLines, ErrorMessages int ArgCounts map[string]int OtherLines []string @@ -73,10 +74,11 @@ var ( return x - y }, }).Parse(`Total number of lines: {{.TotalLines}} +JSON line continuation: {{.SplitLines}} Valid JSON messages: {{.JsonLines}} ({{percent .JsonLines .TotalLines}} of total lines) Error messages: {{.ErrorMessages}} ({{percent .ErrorMessages .JsonLines}} of valid JSON messages) -Unrecognized lines: {{sub .TotalLines .JsonLines}} -{{range .OtherLines}} {{.}} +Unrecognized lines: {{sub (sub .TotalLines .JsonLines) .SplitLines}} +{{range .OtherLines}} {{if gt (len .) 80}}{{slice . 0 80}}{{else}}{{.}}{{end}} {{end}} Args: total: {{if .ArgCounts.total}}{{.ArgCounts.total}}{{else}}0{{end}}{{if .ArgCounts.string}} @@ -119,14 +121,28 @@ func loadLog(path string) (messages []logMessage, stats logStats, err error) { stats.ArgCounts = map[string]int{} scanner := bufio.NewScanner(file) + var buffer bytes.Buffer for lineNo := 0; scanner.Scan(); lineNo++ { + stats.TotalLines++ line := scanner.Bytes() - msg, err := parseLine(line, &stats) + buffer.Write(line) + msg, err := parseLine(buffer.Bytes(), &stats) if err != nil { + // JSON might have been split across multiple lines. + var jsonErr *json.SyntaxError + if errors.As(err, &jsonErr) && jsonErr.Offset > 1 { + // The start of the buffer was okay. Keep the + // data and add the next line to it. + stats.SplitLines++ + continue + } stats.OtherLines = append(stats.OtherLines, fmt.Sprintf("%d: %s", lineNo, string(line))) + buffer.Reset() continue } + stats.JsonLines++ messages = append(messages, msg) + buffer.Reset() } if err := scanner.Err(); err != nil { @@ -136,26 +152,16 @@ func loadLog(path string) (messages []logMessage, stats logStats, err error) { return } -// systemd prefix: -// Nov 19 02:08:51 kind-worker2 kubelet[250]: {"ts":1637287731687.8315,... -// -// kubectl (?) prefix: -// 2021-11-19T02:08:28.475825534Z stderr F {"ts": ... -var prefixRE = regexp.MustCompile(`^\w+ \d+ \S+ \S+ \S+: |\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z stderr . `) - // String format for API structs from generated.pb.go. // &Container{...} var objectRE = regexp.MustCompile(`^&([a-zA-Z]*)\{`) func parseLine(line []byte, stats *logStats) (item logMessage, err error) { - stats.TotalLines++ - line = prefixRE.ReplaceAll(line, nil) content := map[string]interface{}{} if err := json.Unmarshal(line, &content); err != nil { - return logMessage{}, fmt.Errorf("JSON parsing failed: %v", err) + return logMessage{}, fmt.Errorf("JSON parsing failed: %w", err) } - stats.JsonLines++ kvs := map[string]interface{}{} item.isError = true @@ -244,6 +250,7 @@ func parseLine(line []byte, stats *logStats) (item logMessage, err error) { // fields are an error). var objectTypes = []reflect.Type{ reflect.TypeOf(klog.ObjectRef{}), + reflect.TypeOf(&runtimev1.VersionResponse{}), reflect.TypeOf(&v1.Pod{}), reflect.TypeOf(&v1.Container{}), } diff --git a/test/integration/logs/benchmark/load_test.go b/test/integration/logs/benchmark/load_test.go index 5ccd2e0b1c8..98fb77b21ad 100644 --- a/test/integration/logs/benchmark/load_test.go +++ b/test/integration/logs/benchmark/load_test.go @@ -19,24 +19,29 @@ package benchmark import ( "bytes" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" + logsapi "k8s.io/component-base/logs/api/v1" + _ "k8s.io/component-base/logs/json/register" + runtimev1 "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" ) func TestData(t *testing.T) { - container := v1.Container{ - Command: []string{"sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"}, - Image: "registry.k8s.io/e2e-test-images/busybox:1.29-2", - Name: "terminate-cmd-rpn", - TerminationMessagePath: "/dev/termination-log", + versionResponse := &runtimev1.VersionResponse{ + Version: "0.1.0", + RuntimeName: "containerd", + RuntimeVersion: "v1.6.18", + RuntimeApiVersion: "v1", } testcases := map[string]struct { - messages []logMessage + messages []logMessage + // These are subsets of the full output and may be empty. + // Prefix and variable stack traces therefore aren't compared. printf, structured, json string stats logStats }{ @@ -46,18 +51,31 @@ func TestData(t *testing.T) { msg: "Pod status updated", }, }, - printf: `Pod status updated: [] -`, - structured: `"Pod status updated" -`, - json: `{"msg":"Pod status updated","v":0} -`, + printf: `Pod status updated: []`, + structured: `"Pod status updated"`, + json: `"msg":"Pod status updated","v":0`, stats: logStats{ TotalLines: 1, JsonLines: 1, ArgCounts: map[string]int{}, }, }, + "data/split.log": { + messages: []logMessage{ + { + msg: "Pod status updated", + }, + { + msg: "Pod status updated again", + }, + }, + stats: logStats{ + TotalLines: 3, + SplitLines: 1, + JsonLines: 2, + ArgCounts: map[string]int{}, + }, + }, "data/error.log": { messages: []logMessage{ { @@ -66,12 +84,9 @@ func TestData(t *testing.T) { isError: true, }, }, - printf: `Pod status update: failed [] -`, - structured: `"Pod status update" err="failed" -`, - json: `{"msg":"Pod status update","err":"failed"} -`, + printf: `Pod status update: failed []`, + structured: `"Pod status update" err="failed"`, + json: `"msg":"Pod status update","err":"failed"`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -89,12 +104,9 @@ func TestData(t *testing.T) { kvs: []interface{}{"err", errors.New("failed")}, }, }, - printf: `Pod status update: [err failed] -`, - structured: `"Pod status update" err="failed" -`, - json: `{"msg":"Pod status update","v":0,"err":"failed"} -`, + printf: `Pod status update: [err failed]`, + structured: `"Pod status update" err="failed"`, + json: `"msg":"Pod status update","v":0,"err":"failed"`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -116,12 +128,9 @@ func TestData(t *testing.T) { }, }, }, - printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1] -`, - structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1 -`, - json: `{"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1} -`, + printf: `Example: [pod system/kube-scheduler pv volume someString hello world someValue 1]`, + structured: `"Example" pod="system/kube-scheduler" pv="volume" someString="hello world" someValue=1`, + json: `"msg":"Example","v":0,"pod":{"name":"kube-scheduler","namespace":"system"},"pv":{"name":"volume"},"someString":"hello world","someValue":1`, stats: logStats{ TotalLines: 1, JsonLines: 1, @@ -133,100 +142,78 @@ func TestData(t *testing.T) { }, }, }, - "data/container.log": { + "data/versionresponse.log": { messages: []logMessage{ { - msg: "Creating container in pod", + msg: "[RemoteRuntimeService] Version Response", + verbosity: 10, kvs: []interface{}{ - "container", &container, + "apiVersion", versionResponse, }, }, }, - printf: `Creating container in pod: [container &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c -f=/restart-count/restartCount -count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) -if [ $count -eq 1 ]; then - exit 1 -fi -if [ $count -eq 2 ]; then - exit 0 -fi -while true; do sleep 1; done -],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}] -`, - structured: `"Creating container in pod" container=< - &Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c - f=/restart-count/restartCount - count=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'}) - if [ $count -eq 1 ]; then - exit 1 - fi - if [ $count -eq 2 ]; then - exit 0 - fi - while true; do sleep 1; done - ],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},} - > -`, - // This is what the output would look like with JSON object. Because of https://github.com/kubernetes/kubernetes/issues/106652 we get the string instead. - // json: `{"msg":"Creating container in pod","v":0,"container":{"name":"terminate-cmd-rpn","image":"registry.k8s.io/e2e-test-images/busybox:1.29-2","command":["sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n"],"resources":{},"terminationMessagePath":"/dev/termination-log"}} - // `, - json: `{"msg":"Creating container in pod","v":0,"container":"&Container{Name:terminate-cmd-rpn,Image:registry.k8s.io/e2e-test-images/busybox:1.29-2,Command:[sh -c \nf=/restart-count/restartCount\ncount=$(echo 'hello' >> $f ; wc -l $f | awk {'print $1'})\nif [ $count -eq 1 ]; then\n\texit 1\nfi\nif [ $count -eq 2 ]; then\n\texit 0\nfi\nwhile true; do sleep 1; done\n],Args:[],WorkingDir:,Ports:[]ContainerPort{},Env:[]EnvVar{},Resources:ResourceRequirements{Limits:ResourceList{},Requests:ResourceList{},Claims:[]ResourceClaim{},},VolumeMounts:[]VolumeMount{},LivenessProbe:nil,ReadinessProbe:nil,Lifecycle:nil,TerminationMessagePath:/dev/termination-log,ImagePullPolicy:,SecurityContext:nil,Stdin:false,StdinOnce:false,TTY:false,EnvFrom:[]EnvFromSource{},TerminationMessagePolicy:,VolumeDevices:[]VolumeDevice{},StartupProbe:nil,ResizePolicy:[]ContainerResizePolicy{},}"} -`, + printf: `[RemoteRuntimeService] Version Response: [apiVersion &VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}]`, + structured: `"[RemoteRuntimeService] Version Response" apiVersion="&VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}"`, + // Because of + // https://github.com/kubernetes/kubernetes/issues/106652 + // we get the string instead of a JSON struct. + json: `"msg":"[RemoteRuntimeService] Version Response","v":0,"apiVersion":"&VersionResponse{Version:0.1.0,RuntimeName:containerd,RuntimeVersion:v1.6.18,RuntimeApiVersion:v1,}"`, stats: logStats{ - TotalLines: 2, + TotalLines: 1, JsonLines: 1, ArgCounts: map[string]int{ totalArg: 1, otherArg: 1, }, - OtherLines: []string{ - "0: # This is a manually created message. See https://github.com/kubernetes/kubernetes/issues/106652 for the real one.", - }, OtherArgs: []interface{}{ - &container, + versionResponse, }, }, }, } - for path, expected := range testcases { - t.Run(path, func(t *testing.T) { - messages, stats, err := loadLog(path) + for filePath, expected := range testcases { + t.Run(filePath, func(t *testing.T) { + messages, stats, err := loadLog(filePath) if err != nil { t.Fatalf("unexpected load error: %v", err) } assert.Equal(t, expected.messages, messages) assert.Equal(t, expected.stats, stats) - print := func(format func(item logMessage)) { + printAll := func(format func(item logMessage)) { for _, item := range expected.messages { format(item) } } - testBuffered := func(t *testing.T, expected string, format func(item logMessage)) { + testBuffered := func(t *testing.T, expected string, format string, print func(item logMessage)) { var buffer bytes.Buffer + c := logsapi.NewLoggingConfiguration() + c.Format = format + o := logsapi.LoggingOptions{ + ErrorStream: &buffer, + InfoStream: &buffer, + } klog.SetOutput(&buffer) - defer klog.SetOutput(&output) + if err := logsapi.ValidateAndApplyWithOptions(c, &o, nil); err != nil { + t.Fatalf("Unexpected error configuring logging: %v", err) + } - print(format) + printAll(print) klog.Flush() - assert.Equal(t, expected, buffer.String()) + + if !strings.Contains(buffer.String(), expected) { + t.Errorf("Expected log output to contain:\n%s\nActual log output:\n%s\n", expected, buffer.String()) + } } t.Run("printf", func(t *testing.T) { - testBuffered(t, expected.printf, printf) + testBuffered(t, expected.printf, "text", printf) }) t.Run("structured", func(t *testing.T) { - testBuffered(t, expected.structured, prints) + testBuffered(t, expected.structured, "text", printLogger) }) t.Run("json", func(t *testing.T) { - var buffer bytes.Buffer - logger := newJSONLogger(&buffer) - klog.SetLogger(logger) - defer klog.ClearLogger() - print(prints) - klog.Flush() - assert.Equal(t, expected.json, buffer.String()) + testBuffered(t, expected.json, "json", printLogger) }) }) }