118 lines
4.0 KiB
Go
118 lines
4.0 KiB
Go
/*
|
|
Copyright 2021 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
"k8s.io/component-base/version"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/probe"
|
|
)
|
|
|
|
// Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks.
|
|
type Prober interface {
|
|
Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error)
|
|
}
|
|
|
|
type grpcProber struct {
|
|
}
|
|
|
|
// New Prober for execute grpc probe
|
|
func New() Prober {
|
|
return grpcProber{}
|
|
}
|
|
|
|
// Probe executes a grpc call to check the liveness/readiness/startup of container.
|
|
// Returns the Result status, command output, and errors if any.
|
|
// Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior.
|
|
// err is always nil
|
|
func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) {
|
|
v := version.Get()
|
|
|
|
opts := []grpc.DialOption{
|
|
grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)),
|
|
grpc.WithBlock(),
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()), //credentials are currently not supported
|
|
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
|
|
return probe.ProbeDialer().DialContext(ctx, "tcp", addr)
|
|
}),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
|
|
defer cancel()
|
|
|
|
addr := net.JoinHostPort(host, fmt.Sprintf("%d", port))
|
|
conn, err := grpc.DialContext(ctx, addr, opts...)
|
|
|
|
if err != nil {
|
|
if err == context.DeadlineExceeded {
|
|
klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
|
|
return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil
|
|
} else {
|
|
klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
|
|
return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil
|
|
}
|
|
}
|
|
|
|
defer func() {
|
|
_ = conn.Close()
|
|
}()
|
|
|
|
client := grpchealth.NewHealthClient(conn)
|
|
|
|
resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{
|
|
Service: service,
|
|
})
|
|
|
|
if err != nil {
|
|
stat, ok := status.FromError(err)
|
|
if ok {
|
|
switch stat.Code() {
|
|
case codes.Unimplemented:
|
|
klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
|
|
return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil
|
|
case codes.DeadlineExceeded:
|
|
klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
|
|
return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil
|
|
default:
|
|
klog.V(4).ErrorS(err, "rpc probe failed")
|
|
}
|
|
} else {
|
|
klog.V(4).ErrorS(err, "health rpc probe failed")
|
|
}
|
|
|
|
return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil
|
|
}
|
|
|
|
if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
|
|
return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil
|
|
}
|
|
|
|
return probe.Success, fmt.Sprintf("service healthy"), nil
|
|
}
|