implement :grpc probe action

This commit is contained in:
Sergey Kanzhelev
2021-11-16 19:20:49 +00:00
parent cd6ffff85d
commit b7affcced1
95 changed files with 23730 additions and 20464 deletions

111
pkg/probe/grpc/grpc.go Normal file
View File

@@ -0,0 +1,111 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
"fmt"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/probe"
)
// Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks.
type Prober interface {
Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error)
}
type grpcProber struct {
}
// New Prober for execute grpc probe
func New() Prober {
return grpcProber{}
}
// Probe executes a grpc call to check the liveness/readiness/startup of container.
// Returns the Result status, command output, and errors if any.
// Only return non-nil error when service is unavailable and/or not implementing the interface,
// otherwise result status is failed,BUT err is nil
func (p grpcProber) Probe(host, service string, port int, timeout time.Duration, opts ...grpc.DialOption) (probe.Result, string, error) {
v := version.Get()
md := metadata.New(map[string]string{
"User-Agent": fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor),
})
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
addr := net.JoinHostPort(host, fmt.Sprintf("%d", port))
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
if err == context.DeadlineExceeded {
klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("GRPC probe failed to dial: %s", err), nil
} else {
klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
return probe.Failure, "", fmt.Errorf("GRPC probe failed to dial: %w", err)
}
}
defer func() {
_ = conn.Close()
}()
client := grpchealth.NewHealthClient(conn)
resp, err := client.Check(metadata.NewOutgoingContext(ctx, md), &grpchealth.HealthCheckRequest{
Service: service,
})
if err != nil {
state, ok := status.FromError(err)
if ok {
switch state.Code() {
case codes.Unimplemented:
klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
return probe.Failure, "", fmt.Errorf("server does not implement the grpc health protocol: %w", err)
case codes.DeadlineExceeded:
klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
return probe.Failure, fmt.Sprintf("GRPC probe failed with DeadlineExceeded"), nil
default:
klog.V(4).ErrorS(err, "rpc probe failed")
}
} else {
klog.V(4).ErrorS(err, "health rpc probe failed")
}
return probe.Failure, "", fmt.Errorf("health rpc probe failed: %w", err)
}
if resp.Status != grpchealth.HealthCheckResponse_SERVING {
return probe.Failure, fmt.Sprintf("GRPC probe failed with status: %s", resp.Status.String()), nil
}
return probe.Success, fmt.Sprintf("GRPC probe success"), nil
}

186
pkg/probe/grpc/grpc_test.go Normal file
View File

@@ -0,0 +1,186 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grpc
import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/kubernetes/pkg/probe"
)
func TestNew(t *testing.T) {
t.Run("Should: implement Probe interface", func(t *testing.T) {
s := New()
assert.Implements(t, (*Prober)(nil), s)
})
}
type successServerMock struct {
}
func (s successServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
return &grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
}, nil
}
func (s successServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
return stream.Send(&grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
})
}
type errorTimeoutServerMock struct {
}
func (e errorTimeoutServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
time.Sleep(time.Second * 4)
return &grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
}, nil
}
func (e errorTimeoutServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
time.Sleep(time.Second * 4)
return stream.Send(&grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_SERVING,
})
}
type errorNotServeServerMock struct {
}
func (e errorNotServeServerMock) Check(context.Context, *grpchealth.HealthCheckRequest) (*grpchealth.HealthCheckResponse, error) {
return &grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_NOT_SERVING,
}, nil
}
func (e errorNotServeServerMock) Watch(_ *grpchealth.HealthCheckRequest, stream grpchealth.Health_WatchServer) error {
return stream.Send(&grpchealth.HealthCheckResponse{
Status: grpchealth.HealthCheckResponse_NOT_SERVING,
})
}
func TestGrpcProber_Probe(t *testing.T) {
t.Run("Should: failed but return nil error because cant find host", func(t *testing.T) {
s := New()
p, o, err := s.Probe("", "", 32, time.Second, grpc.WithInsecure(), grpc.WithBlock())
assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed to dial: context deadline exceeded", o)
})
t.Run("Should: return nil error because connection closed", func(t *testing.T) {
s := New()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "res")
}))
u := strings.Split(server.URL, ":")
assert.Equal(t, 3, len(u))
port, err := strconv.Atoi(u[2])
assert.Equal(t, nil, err)
// take some time to wait server boot
time.Sleep(2 * time.Second)
p, _, err := s.Probe("127.0.0.1", "", port, time.Second, grpc.WithInsecure())
assert.Equal(t, probe.Failure, p)
assert.NotEqual(t, nil, err)
})
t.Run("Should: return nil error because server response not served", func(t *testing.T) {
s := New()
lis, _ := net.Listen("tcp", ":0")
port := lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
grpchealth.RegisterHealthServer(grpcServer, &errorNotServeServerMock{})
go func() {
_ = grpcServer.Serve(lis)
}()
// take some time to wait server boot
time.Sleep(2 * time.Second)
p, o, err := s.Probe("0.0.0.0", "", port, time.Second, grpc.WithInsecure())
assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed with status: NOT_SERVING", o)
})
t.Run("Should: return nil-error because server not response in time", func(t *testing.T) {
s := New()
lis, _ := net.Listen("tcp", ":0")
port := lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
grpchealth.RegisterHealthServer(grpcServer, &errorTimeoutServerMock{})
go func() {
_ = grpcServer.Serve(lis)
}()
// take some time to wait server boot
time.Sleep(2 * time.Second)
p, o, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure())
assert.Equal(t, probe.Failure, p)
assert.Equal(t, nil, err)
assert.Equal(t, "GRPC probe failed with DeadlineExceeded", o)
})
t.Run("Should: not return error because check was success", func(t *testing.T) {
s := New()
lis, _ := net.Listen("tcp", ":0")
port := lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
grpchealth.RegisterHealthServer(grpcServer, &successServerMock{})
go func() {
_ = grpcServer.Serve(lis)
}()
// take some time to wait server boot
time.Sleep(2 * time.Second)
p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure())
assert.Equal(t, probe.Success, p)
assert.Equal(t, nil, err)
})
t.Run("Should: not return error because check was success, when listen port is 0", func(t *testing.T) {
s := New()
lis, _ := net.Listen("tcp", ":0")
port := lis.Addr().(*net.TCPAddr).Port
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
grpchealth.RegisterHealthServer(grpcServer, &successServerMock{})
go func() {
_ = grpcServer.Serve(lis)
}()
// take some time to wait server boot
time.Sleep(2 * time.Second)
p, _, err := s.Probe("0.0.0.0", "", port, time.Second*2, grpc.WithInsecure())
assert.Equal(t, probe.Success, p)
assert.Equal(t, nil, err)
})
}