Merge pull request #105126 from sallyom/tracing-kubelet
kubelet tracing instrumentation
This commit is contained in:
@@ -499,6 +499,13 @@ const (
|
||||
// Enable POD resources API to return allocatable resources
|
||||
KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable"
|
||||
|
||||
// owner: @sallyom
|
||||
// kep: http://kep.k8s.io/2832
|
||||
// alpha: v1.25
|
||||
//
|
||||
// Add support for distributed tracing in the kubelet
|
||||
KubeletTracing featuregate.Feature = "KubeletTracing"
|
||||
|
||||
// owner: @zshihang
|
||||
// kep: http://kep.k8s.io/2800
|
||||
// beta: v1.24
|
||||
@@ -977,6 +984,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
KubeletPodResourcesGetAllocatable: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
KubeletTracing: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
LocalStorageCapacityIsolation: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
8
pkg/generated/openapi/zz_generated.openapi.go
generated
8
pkg/generated/openapi/zz_generated.openapi.go
generated
@@ -54583,11 +54583,17 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen
|
||||
Format: "",
|
||||
},
|
||||
},
|
||||
"tracing": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "Tracing specifies the versioned configuration for OpenTelemetry tracing clients. See http://kep.k8s.io/2832 for more details.",
|
||||
Ref: ref("k8s.io/component-base/tracing/api/v1.TracingConfiguration"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Dependencies: []string{
|
||||
"k8s.io/api/core/v1.Taint", "k8s.io/apimachinery/pkg/apis/meta/v1.Duration", "k8s.io/component-base/logs/api/v1.LoggingConfiguration", "k8s.io/kubelet/config/v1beta1.KubeletAuthentication", "k8s.io/kubelet/config/v1beta1.KubeletAuthorization", "k8s.io/kubelet/config/v1beta1.MemoryReservation", "k8s.io/kubelet/config/v1beta1.MemorySwapConfiguration", "k8s.io/kubelet/config/v1beta1.ShutdownGracePeriodByPodPriority"},
|
||||
"k8s.io/api/core/v1.Taint", "k8s.io/apimachinery/pkg/apis/meta/v1.Duration", "k8s.io/component-base/logs/api/v1.LoggingConfiguration", "k8s.io/component-base/tracing/api/v1.TracingConfiguration", "k8s.io/kubelet/config/v1beta1.KubeletAuthentication", "k8s.io/kubelet/config/v1beta1.KubeletAuthorization", "k8s.io/kubelet/config/v1beta1.MemoryReservation", "k8s.io/kubelet/config/v1beta1.MemorySwapConfiguration", "k8s.io/kubelet/config/v1beta1.ShutdownGracePeriodByPodPriority"},
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -47,7 +47,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// New sets up the plugins and admission start hooks needed for admission
|
||||
func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp *trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
|
||||
func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
|
||||
webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp)
|
||||
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver)
|
||||
|
||||
|
@@ -280,5 +280,7 @@ var (
|
||||
"ShutdownGracePeriod.Duration",
|
||||
"ShutdownGracePeriodCriticalPods.Duration",
|
||||
"MemoryThrottlingFactor",
|
||||
"Tracing.Endpoint",
|
||||
"Tracing.SamplingRatePerMillion",
|
||||
)
|
||||
)
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
componentconfigtesting "k8s.io/component-base/config/testing"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
)
|
||||
|
||||
func TestComponentConfigSetup(t *testing.T) {
|
||||
@@ -33,11 +34,12 @@ func TestComponentConfigSetup(t *testing.T) {
|
||||
SchemeGroupVersion: SchemeGroupVersion,
|
||||
AddToScheme: AddToScheme,
|
||||
AllowedTags: map[reflect.Type]bool{
|
||||
reflect.TypeOf(logsapi.LoggingConfiguration{}): true,
|
||||
reflect.TypeOf(metav1.Duration{}): true,
|
||||
reflect.TypeOf(metav1.TypeMeta{}): true,
|
||||
reflect.TypeOf(v1.NodeConfigSource{}): true,
|
||||
reflect.TypeOf(v1.Taint{}): true,
|
||||
reflect.TypeOf(logsapi.LoggingConfiguration{}): true,
|
||||
reflect.TypeOf(tracingapi.TracingConfiguration{}): true,
|
||||
reflect.TypeOf(metav1.Duration{}): true,
|
||||
reflect.TypeOf(metav1.TypeMeta{}): true,
|
||||
reflect.TypeOf(v1.NodeConfigSource{}): true,
|
||||
reflect.TypeOf(v1.Taint{}): true,
|
||||
},
|
||||
}
|
||||
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
)
|
||||
|
||||
// HairpinMode denotes how the kubelet should configure networking to handle
|
||||
@@ -441,10 +442,14 @@ type KubeletConfiguration struct {
|
||||
// is true and upon the initial registration of the node.
|
||||
// +optional
|
||||
RegisterWithTaints []v1.Taint
|
||||
|
||||
// registerNode enables automatic registration with the apiserver.
|
||||
// +optional
|
||||
RegisterNode bool
|
||||
// Tracing specifies the versioned configuration for OpenTelemetry tracing clients.
|
||||
// See http://kep.k8s.io/2832 for more details.
|
||||
// +featureGate=KubeletTracing
|
||||
// +optional
|
||||
Tracing *tracingapi.TracingConfiguration
|
||||
}
|
||||
|
||||
// KubeletAuthorizationMode denotes the authorization mode for the kubelet
|
||||
|
@@ -28,6 +28,7 @@ import (
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
conversion "k8s.io/apimachinery/pkg/conversion"
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
apiv1 "k8s.io/component-base/tracing/api/v1"
|
||||
v1beta1 "k8s.io/kubelet/config/v1beta1"
|
||||
config "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
)
|
||||
@@ -506,6 +507,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
|
||||
if err := v1.Convert_Pointer_bool_To_bool(&in.RegisterNode, &out.RegisterNode, s); err != nil {
|
||||
return err
|
||||
}
|
||||
out.Tracing = (*apiv1.TracingConfiguration)(unsafe.Pointer(in.Tracing))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -680,6 +682,7 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
|
||||
if err := v1.Convert_bool_To_Pointer_bool(&in.RegisterNode, &out.RegisterNode, s); err != nil {
|
||||
return err
|
||||
}
|
||||
out.Tracing = (*apiv1.TracingConfiguration)(unsafe.Pointer(in.Tracing))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/component-base/featuregate"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
"k8s.io/component-base/metrics"
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
|
||||
@@ -241,6 +242,14 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
|
||||
allErrors = append(allErrors, errs.ToAggregate().Errors()...)
|
||||
}
|
||||
|
||||
if localFeatureGate.Enabled(features.KubeletTracing) {
|
||||
if errs := tracingapi.ValidateTracingConfiguration(kc.Tracing, localFeatureGate, field.NewPath("tracing")); len(errs) > 0 {
|
||||
allErrors = append(allErrors, errs.ToAggregate().Errors()...)
|
||||
}
|
||||
} else if kc.Tracing != nil {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled."))
|
||||
}
|
||||
|
||||
if localFeatureGate.Enabled(features.MemoryQoS) && kc.MemoryThrottlingFactor == nil {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid configuration: memoryThrottlingFactor is required when MemoryQoS feature flag is enabled"))
|
||||
}
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
logsapi "k8s.io/component-base/logs/api/v1"
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
@@ -497,6 +498,36 @@ func TestValidateKubeletConfiguration(t *testing.T) {
|
||||
},
|
||||
errMsg: "invalid configuration: taint.TimeAdded is not nil",
|
||||
},
|
||||
{
|
||||
name: "specify tracing with KubeletTracing disabled",
|
||||
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
|
||||
samplingRate := int32(99999)
|
||||
conf.FeatureGates = map[string]bool{"KubeletTracing": false}
|
||||
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
|
||||
return conf
|
||||
},
|
||||
errMsg: "invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled.",
|
||||
},
|
||||
{
|
||||
name: "specify tracing invalid sampling rate",
|
||||
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
|
||||
samplingRate := int32(-1)
|
||||
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
|
||||
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
|
||||
return conf
|
||||
},
|
||||
errMsg: "tracing.samplingRatePerMillion: Invalid value: -1: sampling rate must be positive",
|
||||
},
|
||||
{
|
||||
name: "specify tracing invalid endpoint",
|
||||
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
|
||||
ep := "dn%2s://localhost:4317"
|
||||
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
|
||||
conf.Tracing = &tracingapi.TracingConfiguration{Endpoint: &ep}
|
||||
return conf
|
||||
},
|
||||
errMsg: "tracing.endpoint: Invalid value: \"dn%2s://localhost:4317\": parse \"dn%2s://localhost:4317\": first path segment in URL cannot contain colon",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
|
6
pkg/kubelet/apis/config/zz_generated.deepcopy.go
generated
6
pkg/kubelet/apis/config/zz_generated.deepcopy.go
generated
@@ -25,6 +25,7 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
apiv1 "k8s.io/component-base/tracing/api/v1"
|
||||
)
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
@@ -307,6 +308,11 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
}
|
||||
if in.Tracing != nil {
|
||||
in, out := &in.Tracing, &out.Tracing
|
||||
*out = new(apiv1.TracingConfiguration)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@@ -23,16 +23,20 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/status"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/logs/logreduction"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/probe/exec"
|
||||
utilexec "k8s.io/utils/exec"
|
||||
@@ -68,7 +72,7 @@ const (
|
||||
)
|
||||
|
||||
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
|
||||
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
|
||||
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
|
||||
klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
|
||||
addr, dialer, err := util.GetAddressAndDialer(endpoint)
|
||||
if err != nil {
|
||||
@@ -77,10 +81,23 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (
|
||||
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, addr,
|
||||
dialOpts := []grpc.DialOption{}
|
||||
dialOpts = append(dialOpts,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithContextDialer(dialer),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
|
||||
tracingOpts := []otelgrpc.Option{
|
||||
otelgrpc.WithPropagators(tracing.Propagators()),
|
||||
otelgrpc.WithTracerProvider(tp),
|
||||
}
|
||||
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
|
||||
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
|
||||
dialOpts = append(dialOpts,
|
||||
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
|
||||
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
|
||||
}
|
||||
conn, err := grpc.DialContext(ctx, addr, dialOpts...)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
|
||||
return nil, err
|
||||
|
@@ -17,14 +17,23 @@ limitations under the License.
|
||||
package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
internalapi "k8s.io/cri-api/pkg/apis"
|
||||
//kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
apitest "k8s.io/cri-api/pkg/apis/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
)
|
||||
@@ -47,12 +56,45 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
|
||||
}
|
||||
|
||||
func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService {
|
||||
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout)
|
||||
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider())
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
return runtimeService
|
||||
}
|
||||
|
||||
func createRemoteRuntimeServiceWithTracerProvider(endpoint string, tp oteltrace.TracerProvider, t *testing.T) internalapi.RuntimeService {
|
||||
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout, tp)
|
||||
require.NoError(t, err)
|
||||
|
||||
return runtimeService
|
||||
}
|
||||
|
||||
func TestGetSpans(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletTracing, true)()
|
||||
fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
|
||||
defer func() {
|
||||
fakeRuntime.Stop()
|
||||
// clear endpoint file
|
||||
if addr, _, err := util.GetAddressAndDialer(endpoint); err == nil {
|
||||
if _, err := os.Stat(addr); err == nil {
|
||||
os.Remove(addr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
exp := tracetest.NewInMemoryExporter()
|
||||
tp := sdktrace.NewTracerProvider(
|
||||
sdktrace.WithBatcher(exp),
|
||||
)
|
||||
ctx := context.Background()
|
||||
rtSvc := createRemoteRuntimeServiceWithTracerProvider(endpoint, tp, t)
|
||||
_, err := rtSvc.Version(apitest.FakeVersion)
|
||||
require.NoError(t, err)
|
||||
err = tp.ForceFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, exp.GetSpans())
|
||||
}
|
||||
|
||||
func TestVersion(t *testing.T) {
|
||||
fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
|
||||
defer func() {
|
||||
@@ -65,8 +107,8 @@ func TestVersion(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
r := createRemoteRuntimeService(endpoint, t)
|
||||
version, err := r.Version(apitest.FakeVersion)
|
||||
rtSvc := createRemoteRuntimeService(endpoint, t)
|
||||
version, err := rtSvc.Version(apitest.FakeVersion)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, apitest.FakeVersion, version.Version)
|
||||
assert.Equal(t, apitest.FakeRuntimeName, version.RuntimeName)
|
||||
|
@@ -38,6 +38,7 @@ import (
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"k8s.io/mount-utils"
|
||||
"k8s.io/utils/integer"
|
||||
netutils "k8s.io/utils/net"
|
||||
@@ -206,7 +207,7 @@ type Bootstrap interface {
|
||||
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
|
||||
BirthCry()
|
||||
StartGarbageCollection()
|
||||
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface)
|
||||
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
|
||||
ListenAndServeReadOnly(address net.IP, port uint)
|
||||
ListenAndServePodResources()
|
||||
Run(<-chan kubetypes.PodUpdate)
|
||||
@@ -236,6 +237,7 @@ type Dependencies struct {
|
||||
ProbeManager prober.Manager
|
||||
Recorder record.EventRecorder
|
||||
Subpather subpath.Interface
|
||||
TracerProvider trace.TracerProvider
|
||||
VolumePlugins []volume.VolumePlugin
|
||||
DynamicPluginProber volume.DynamicPluginProber
|
||||
TLSOptions *server.TLSOptions
|
||||
@@ -293,7 +295,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
}
|
||||
|
||||
var err error
|
||||
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
|
||||
if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
|
||||
return err
|
||||
}
|
||||
if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
|
||||
@@ -2395,8 +2397,8 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
|
||||
|
||||
// ListenAndServe runs the kubelet HTTP server.
|
||||
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
|
||||
auth server.AuthInterface) {
|
||||
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth)
|
||||
auth server.AuthInterface, tp trace.TracerProvider) {
|
||||
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
|
||||
}
|
||||
|
||||
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
|
||||
|
@@ -37,6 +37,8 @@ import (
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
cadvisorv2 "github.com/google/cadvisor/info/v2"
|
||||
"github.com/google/cadvisor/metrics"
|
||||
"go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
|
||||
@@ -127,6 +129,7 @@ type containerInterface interface {
|
||||
// so we can ensure restful.FilterFunctions are used for all handlers
|
||||
type filteringContainer struct {
|
||||
*restful.Container
|
||||
|
||||
registeredHandlePaths []string
|
||||
}
|
||||
|
||||
@@ -144,12 +147,13 @@ func ListenAndServeKubeletServer(
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
tlsOptions *TLSOptions,
|
||||
auth AuthInterface) {
|
||||
auth AuthInterface,
|
||||
tp oteltrace.TracerProvider) {
|
||||
|
||||
address := netutils.ParseIPSloppy(kubeCfg.Address)
|
||||
port := uint(kubeCfg.Port)
|
||||
klog.InfoS("Starting to listen", "address", address, "port", port)
|
||||
handler := NewServer(host, resourceAnalyzer, auth, kubeCfg)
|
||||
handler := NewServer(host, resourceAnalyzer, auth, tp, kubeCfg)
|
||||
s := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||
Handler: &handler,
|
||||
@@ -158,6 +162,7 @@ func ListenAndServeKubeletServer(
|
||||
WriteTimeout: 4 * 60 * time.Minute,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
|
||||
if tlsOptions != nil {
|
||||
s.TLSConfig = tlsOptions.Config
|
||||
// Passing empty strings as the cert and key files means no
|
||||
@@ -174,9 +179,14 @@ func ListenAndServeKubeletServer(
|
||||
}
|
||||
|
||||
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
|
||||
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) {
|
||||
func ListenAndServeKubeletReadOnlyServer(
|
||||
host HostInterface,
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
address net.IP,
|
||||
port uint) {
|
||||
klog.InfoS("Starting to listen read-only", "address", address, "port", port)
|
||||
s := NewServer(host, resourceAnalyzer, nil, nil)
|
||||
// TODO: https://github.com/kubernetes/kubernetes/issues/109829 tracer should use WithPublicEndpoint
|
||||
s := NewServer(host, resourceAnalyzer, nil, oteltrace.NewNoopTracerProvider(), nil)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||
@@ -241,7 +251,9 @@ func NewServer(
|
||||
host HostInterface,
|
||||
resourceAnalyzer stats.ResourceAnalyzer,
|
||||
auth AuthInterface,
|
||||
tp oteltrace.TracerProvider,
|
||||
kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
|
||||
|
||||
server := Server{
|
||||
host: host,
|
||||
resourceAnalyzer: resourceAnalyzer,
|
||||
@@ -253,6 +265,9 @@ func NewServer(
|
||||
if auth != nil {
|
||||
server.InstallAuthFilter()
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
|
||||
server.InstallTracingFilter(tp)
|
||||
}
|
||||
server.InstallDefaultHandlers()
|
||||
if kubeCfg != nil && kubeCfg.EnableDebuggingHandlers {
|
||||
server.InstallDebuggingHandlers()
|
||||
@@ -305,6 +320,11 @@ func (s *Server) InstallAuthFilter() {
|
||||
})
|
||||
}
|
||||
|
||||
// InstallTracingFilter installs OpenTelemetry tracing filter with the restful Container.
|
||||
func (s *Server) InstallTracingFilter(tp oteltrace.TracerProvider) {
|
||||
s.restfulCont.Filter(otelrestful.OTelFilter("kubelet", otelrestful.WithTracerProvider(tp)))
|
||||
}
|
||||
|
||||
// addMetricsBucketMatcher adds a regexp matcher and the relevant bucket to use when
|
||||
// it matches. Please be aware this is not thread safe and should not be used dynamically
|
||||
func (s *Server) addMetricsBucketMatcher(bucket string) {
|
||||
|
@@ -37,6 +37,7 @@ import (
|
||||
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -358,6 +359,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo
|
||||
fw.fakeKubelet,
|
||||
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
|
||||
fw.fakeAuth,
|
||||
oteltrace.NewNoopTracerProvider(),
|
||||
kubeCfg,
|
||||
)
|
||||
fw.serverUnderTest = &server
|
||||
|
Reference in New Issue
Block a user