Merge pull request #3695 from mikedanese/ready

refactor pkg/health into more reusable pkg/probe
This commit is contained in:
Tim Hockin
2015-01-28 11:00:32 -08:00
28 changed files with 658 additions and 863 deletions

View File

@@ -39,11 +39,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@@ -84,8 +84,8 @@ func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.Pod
return c.GetPodStatus("localhost", podNamespace, podID) return c.GetPodStatus("localhost", podNamespace, podID)
} }
func (fakeKubeletClient) HealthCheck(host string) (health.Status, error) { func (fakeKubeletClient) HealthCheck(host string) (probe.Status, error) {
return health.Healthy, nil return probe.Success, nil
} }
type delegateHandler struct { type delegateHandler struct {

View File

@@ -24,7 +24,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
) )
// TODO: this basic interface is duplicated in N places. consolidate? // TODO: this basic interface is duplicated in N places. consolidate?
@@ -45,29 +45,30 @@ type validator struct {
client httpGet client httpGet
} }
func (s *Server) check(client httpGet) (health.Status, string, error) { // TODO: can this use pkg/probe/http
func (s *Server) check(client httpGet) (probe.Status, string, error) {
resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path) resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path)
if err != nil { if err != nil {
return health.Unknown, "", err return probe.Unknown, "", err
} }
defer resp.Body.Close() defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body) data, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return health.Unknown, string(data), err return probe.Unknown, string(data), err
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return health.Unhealthy, string(data), return probe.Failure, string(data),
fmt.Errorf("unhealthy http status code: %d (%s)", resp.StatusCode, resp.Status) fmt.Errorf("unhealthy http status code: %d (%s)", resp.StatusCode, resp.Status)
} }
return health.Healthy, string(data), nil return probe.Success, string(data), nil
} }
type ServerStatus struct { type ServerStatus struct {
Component string `json:"component,omitempty"` Component string `json:"component,omitempty"`
Health string `json:"health,omitempty"` Health string `json:"health,omitempty"`
HealthCode health.Status `json:"healthCode,omitempty"` HealthCode probe.Status `json:"healthCode,omitempty"`
Msg string `json:"msg,omitempty"` Msg string `json:"msg,omitempty"`
Err string `json:"err,omitempty"` Err string `json:"err,omitempty"`
} }
func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {

View File

@@ -25,7 +25,7 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
@@ -54,13 +54,13 @@ func TestValidate(t *testing.T) {
tests := []struct { tests := []struct {
err error err error
data string data string
expectedStatus health.Status expectedStatus probe.Status
code int code int
expectErr bool expectErr bool
}{ }{
{fmt.Errorf("test error"), "", health.Unknown, 500 /*ignored*/, true}, {fmt.Errorf("test error"), "", probe.Unknown, 500 /*ignored*/, true},
{nil, "foo", health.Healthy, 200, false}, {nil, "foo", probe.Success, 200, false},
{nil, "foo", health.Unhealthy, 500, true}, {nil, "foo", probe.Failure, 500, true},
} }
s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"} s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}

View File

@@ -26,7 +26,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
) )
// ErrPodInfoNotAvailable may be returned when the requested pod info is not available. // ErrPodInfoNotAvailable may be returned when the requested pod info is not available.
@@ -40,7 +41,7 @@ type KubeletClient interface {
// KubeletHealthchecker is an interface for healthchecking kubelets // KubeletHealthchecker is an interface for healthchecking kubelets
type KubeletHealthChecker interface { type KubeletHealthChecker interface {
HealthCheck(host string) (health.Status, error) HealthCheck(host string) (probe.Status, error)
} }
// PodInfoGetter is an interface for things that can get information about a pod's containers. // PodInfoGetter is an interface for things that can get information about a pod's containers.
@@ -146,8 +147,8 @@ func (c *HTTPKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.
return status, nil return status, nil
} }
func (c *HTTPKubeletClient) HealthCheck(host string) (health.Status, error) { func (c *HTTPKubeletClient) HealthCheck(host string) (probe.Status, error) {
return health.DoHTTPCheck(fmt.Sprintf("%s/healthz", c.url(host)), c.Client) return httprobe.DoHTTPProbe(fmt.Sprintf("%s/healthz", c.url(host)), c.Client)
} }
// FakeKubeletClient is a fake implementation of KubeletClient which returns an error // FakeKubeletClient is a fake implementation of KubeletClient which returns an error
@@ -160,6 +161,6 @@ func (c FakeKubeletClient) GetPodStatus(host, podNamespace string, podID string)
return api.PodStatusResult{}, errors.New("Not Implemented") return api.PodStatusResult{}, errors.New("Not Implemented")
} }
func (c FakeKubeletClient) HealthCheck(host string) (health.Status, error) { func (c FakeKubeletClient) HealthCheck(host string) (probe.Status, error) {
return health.Unknown, errors.New("Not Implemented") return probe.Unknown, errors.New("Not Implemented")
} }

View File

@@ -26,7 +26,7 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
@@ -134,8 +134,8 @@ func TestNewKubeletClient(t *testing.T) {
host := "127.0.0.1" host := "127.0.0.1"
healthStatus, err := client.HealthCheck(host) healthStatus, err := client.HealthCheck(host)
if healthStatus != health.Unhealthy { if healthStatus != probe.Failure {
t.Errorf("Expected %v and got %v.", health.Unhealthy, healthStatus) t.Errorf("Expected %v and got %v.", probe.Failure, healthStatus)
} }
if err != nil { if err != nil {
t.Error("Expected a nil error") t.Error("Expected a nil error")

View File

@@ -1,59 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
const defaultHealthyOutput = "ok"
type CommandRunner interface {
RunInContainer(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
}
type ExecHealthChecker struct {
runner CommandRunner
}
func NewExecHealthChecker(runner CommandRunner) HealthChecker {
return &ExecHealthChecker{runner}
}
func (e *ExecHealthChecker) HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (Status, error) {
if container.LivenessProbe.Exec == nil {
return Unknown, fmt.Errorf("missing exec parameters")
}
data, err := e.runner.RunInContainer(podFullName, podUID, container.Name, container.LivenessProbe.Exec.Command)
glog.V(1).Infof("container %s health check response: %s", podFullName, string(data))
if err != nil {
return Unknown, err
}
if strings.ToLower(string(data)) != defaultHealthyOutput {
return Unhealthy, nil
}
return Healthy, nil
}
func (e *ExecHealthChecker) CanCheck(probe *api.LivenessProbe) bool {
return probe.Exec != nil
}

View File

@@ -1,85 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
type FakeExec struct {
cmd []string
out []byte
err error
}
func (f *FakeExec) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
f.cmd = cmd
return f.out, f.err
}
type healthCheckTest struct {
expectedStatus Status
probe *api.LivenessProbe
expectError bool
output []byte
err error
}
func TestExec(t *testing.T) {
fake := FakeExec{}
checker := ExecHealthChecker{&fake}
tests := []healthCheckTest{
// Missing parameters
{Unknown, &api.LivenessProbe{}, true, nil, nil},
// Ok
{Healthy, &api.LivenessProbe{
Exec: &api.ExecAction{Command: []string{"ls", "-l"}},
}, false, []byte("OK"), nil},
// Run returns error
{Unknown, &api.LivenessProbe{
Exec: &api.ExecAction{
Command: []string{"ls", "-l"},
},
}, true, []byte("OK, NOT"), fmt.Errorf("test error")},
// Unhealthy
{Unhealthy, &api.LivenessProbe{
Exec: &api.ExecAction{Command: []string{"ls", "-l"}},
}, false, []byte("Fail"), nil},
}
for _, test := range tests {
fake.out = test.output
fake.err = test.err
status, err := checker.HealthCheck("test", "", api.PodStatus{}, api.Container{LivenessProbe: test.probe})
if status != test.expectedStatus {
t.Errorf("expected %v, got %v", test.expectedStatus, status)
}
if err != nil && test.expectError == false {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectError == true {
t.Errorf("unexpected non-error")
}
if test.probe.Exec != nil && !reflect.DeepEqual(fake.cmd, test.probe.Exec.Command) {
t.Errorf("expected: %v, got %v", test.probe.Exec.Command, fake.cmd)
}
}
}

View File

@@ -1,115 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/golang/glog"
)
// Status represents the result of a single health-check operation.
type Status int
// Status values must be one of these constants.
const (
Healthy Status = iota
Unhealthy
Unknown
)
// HealthChecker defines an abstract interface for checking container health.
type HealthChecker interface {
HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (Status, error)
CanCheck(probe *api.LivenessProbe) bool
}
// protects allCheckers
var checkerLock = sync.Mutex{}
var allCheckers = []HealthChecker{}
// AddHealthChecker adds a health checker to the list of known HealthChecker objects.
// Any subsequent call to NewHealthChecker will know about this HealthChecker.
func AddHealthChecker(checker HealthChecker) {
checkerLock.Lock()
defer checkerLock.Unlock()
allCheckers = append(allCheckers, checker)
}
// NewHealthChecker creates a new HealthChecker which supports multiple types of liveness probes.
func NewHealthChecker() HealthChecker {
checkerLock.Lock()
defer checkerLock.Unlock()
return &muxHealthChecker{
checkers: append([]HealthChecker{}, allCheckers...),
}
}
// muxHealthChecker bundles multiple implementations of HealthChecker of different types.
type muxHealthChecker struct {
// Given a LivenessProbe, cycle through each known checker and see if it supports
// the specific kind of probe (by returning non-nil).
checkers []HealthChecker
}
func (m *muxHealthChecker) findCheckerFor(probe *api.LivenessProbe) HealthChecker {
for i := range m.checkers {
if m.checkers[i].CanCheck(probe) {
return m.checkers[i]
}
}
return nil
}
// HealthCheck delegates the health-checking of the container to one of the bundled implementations.
// If there is no health checker that can check container it returns Unknown, nil.
func (m *muxHealthChecker) HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (Status, error) {
checker := m.findCheckerFor(container.LivenessProbe)
if checker == nil {
glog.Warningf("Failed to find health checker for %s %+v", container.Name, container.LivenessProbe)
return Unknown, nil
}
return checker.HealthCheck(podFullName, podUID, status, container)
}
func (m *muxHealthChecker) CanCheck(probe *api.LivenessProbe) bool {
return m.findCheckerFor(probe) != nil
}
// findPortByName is a helper function to look up a port in a container by name.
// Returns the HostPort if found, -1 if not found.
func findPortByName(container api.Container, portName string) int {
for _, port := range container.Ports {
if port.Name == portName {
return port.HostPort
}
}
return -1
}
func (s Status) String() string {
switch s {
case Healthy:
return "healthy"
case Unhealthy:
return "unhealthy"
default:
return "unknown"
}
}

View File

@@ -1,142 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
const statusServerEarlyShutdown = -1
func TestHealthChecker(t *testing.T) {
AddHealthChecker(&HTTPHealthChecker{client: &http.Client{}})
var healthCheckerTests = []struct {
status int
health Status
}{
{http.StatusOK, Healthy},
{statusServerEarlyShutdown, Unhealthy},
{http.StatusBadRequest, Unhealthy},
{http.StatusBadGateway, Unhealthy},
{http.StatusInternalServerError, Unhealthy},
}
for _, healthCheckerTest := range healthCheckerTests {
tt := healthCheckerTest
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tt.status)
}))
defer ts.Close()
u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if tt.status == statusServerEarlyShutdown {
ts.Close()
}
container := api.Container{
LivenessProbe: &api.LivenessProbe{
HTTPGet: &api.HTTPGetAction{
Port: util.NewIntOrStringFromString(port),
Path: "/foo/bar",
Host: host,
},
},
}
hc := NewHealthChecker()
health, err := hc.HealthCheck("test", "", api.PodStatus{}, container)
if err != nil && tt.health != Unhealthy {
t.Errorf("Unexpected error: %v", err)
}
if health != tt.health {
t.Errorf("Expected %v, got %v", tt.health, health)
}
}
}
func TestFindPortByName(t *testing.T) {
container := api.Container{
Ports: []api.Port{
{
Name: "foo",
HostPort: 8080,
},
{
Name: "bar",
HostPort: 9000,
},
},
}
want := 8080
got := findPortByName(container, "foo")
if got != want {
t.Errorf("Expected %v, got %v", want, got)
}
}
func TestMuxHealthChecker(t *testing.T) {
muxHealthCheckerTests := []struct {
health Status
}{
// TODO: This test should run through a few different checker types.
{Healthy},
}
mc := &muxHealthChecker{
checkers: []HealthChecker{
&HTTPHealthChecker{client: &http.Client{}},
},
}
for _, muxHealthCheckerTest := range muxHealthCheckerTests {
tt := muxHealthCheckerTest
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
container := api.Container{
LivenessProbe: &api.LivenessProbe{
HTTPGet: &api.HTTPGetAction{},
},
}
container.LivenessProbe.HTTPGet.Port = util.NewIntOrStringFromString(port)
container.LivenessProbe.HTTPGet.Host = host
health, err := mc.HealthCheck("test", "", api.PodStatus{}, container)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if health != tt.health {
t.Errorf("Expected %v, got %v", tt.health, health)
}
}
}

View File

@@ -1,119 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
// HTTPGetInterface is an abstract interface for testability. It abstracts the interface of http.Client.Get.
// This is exported because some other packages may want to do direct HTTP checks.
type HTTPGetInterface interface {
Get(url string) (*http.Response, error)
}
// HTTPHealthChecker is an implementation of HealthChecker which checks container health by sending HTTP Get requests.
type HTTPHealthChecker struct {
client HTTPGetInterface
}
func NewHTTPHealthChecker(client *http.Client) HealthChecker {
return &HTTPHealthChecker{client: &http.Client{}}
}
// getURLParts parses the components of the target URL. For testability.
func getURLParts(status api.PodStatus, container api.Container) (string, int, string, error) {
params := container.LivenessProbe.HTTPGet
if params == nil {
return "", -1, "", fmt.Errorf("no HTTP parameters specified: %v", container)
}
port := -1
switch params.Port.Kind {
case util.IntstrInt:
port = params.Port.IntVal
case util.IntstrString:
port = findPortByName(container, params.Port.StrVal)
if port == -1 {
// Last ditch effort - maybe it was an int stored as string?
var err error
if port, err = strconv.Atoi(params.Port.StrVal); err != nil {
return "", -1, "", err
}
}
}
if port == -1 {
return "", -1, "", fmt.Errorf("unknown port: %v", params.Port)
}
var host string
if len(params.Host) > 0 {
host = params.Host
} else {
host = status.PodIP
}
return host, port, params.Path, nil
}
// formatURL formats a URL from args. For testability.
func formatURL(host string, port int, path string) string {
u := url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
Path: path,
}
return u.String()
}
// DoHTTPCheck checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Healthy.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Unhealthy.
// This is exported because some other packages may want to do direct HTTP checks.
func DoHTTPCheck(url string, client HTTPGetInterface) (Status, error) {
res, err := client.Get(url)
if err != nil {
glog.V(1).Infof("HTTP probe error: %v", err)
return Unhealthy, nil
}
defer res.Body.Close()
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
return Healthy, nil
}
glog.V(1).Infof("Health check failed for %s, Response: %v", url, *res)
return Unhealthy, nil
}
// HealthCheck checks if the container is healthy by trying sending HTTP Get requests to the container.
func (h *HTTPHealthChecker) HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (Status, error) {
host, port, path, err := getURLParts(status, container)
if err != nil {
return Unknown, err
}
return DoHTTPCheck(formatURL(host, port, path), h.client)
}
func (h *HTTPHealthChecker) CanCheck(probe *api.LivenessProbe) bool {
return probe.HTTPGet != nil
}

View File

@@ -1,88 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"fmt"
"net"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
type TCPHealthChecker struct{}
// getTCPAddrParts parses the components of a TCP connection address. For testability.
func getTCPAddrParts(status api.PodStatus, container api.Container) (string, int, error) {
params := container.LivenessProbe.TCPSocket
if params == nil {
return "", -1, fmt.Errorf("error, no TCP parameters specified: %v", container)
}
port := -1
switch params.Port.Kind {
case util.IntstrInt:
port = params.Port.IntVal
case util.IntstrString:
port = findPortByName(container, params.Port.StrVal)
if port == -1 {
// Last ditch effort - maybe it was an int stored as string?
var err error
if port, err = strconv.Atoi(params.Port.StrVal); err != nil {
return "", -1, err
}
}
}
if port == -1 {
return "", -1, fmt.Errorf("unknown port: %v", params.Port)
}
if len(status.PodIP) == 0 {
return "", -1, fmt.Errorf("no host specified.")
}
return status.PodIP, port, nil
}
// DoTCPCheck checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Healthy.
// If the socket fails to open, it returns Unhealthy.
// This is exported because some other packages may want to do direct TCP checks.
func DoTCPCheck(addr string) (Status, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return Unhealthy, nil
}
err = conn.Close()
if err != nil {
glog.Errorf("unexpected error closing health check socket: %v (%#v)", err, err)
}
return Healthy, nil
}
func (t *TCPHealthChecker) HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (Status, error) {
host, port, err := getTCPAddrParts(status, container)
if err != nil {
return Unknown, err
}
return DoTCPCheck(net.JoinHostPort(host, strconv.Itoa(port)))
}
func (t *TCPHealthChecker) CanCheck(probe *api.LivenessProbe) bool {
return probe.TCPSocket != nil
}

View File

@@ -1,115 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package health
import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestGetTCPAddrParts(t *testing.T) {
testCases := []struct {
probe *api.TCPSocketAction
ok bool
host string
port int
}{
{&api.TCPSocketAction{Port: util.NewIntOrStringFromInt(-1)}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("-1")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("not-found")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("found")}, true, "1.2.3.4", 93},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromInt(76)}, true, "1.2.3.4", 76},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("118")}, true, "1.2.3.4", 118},
}
for _, test := range testCases {
state := api.PodStatus{PodIP: "1.2.3.4"}
container := api.Container{
Ports: []api.Port{{Name: "found", HostPort: 93}},
LivenessProbe: &api.LivenessProbe{
TCPSocket: test.probe,
},
}
host, port, err := getTCPAddrParts(state, container)
if !test.ok && err == nil {
t.Errorf("Expected error for %+v, got %s:%d", test, host, port)
}
if test.ok && err != nil {
t.Errorf("Unexpected error: %v", err)
}
if test.ok {
if host != test.host || port != test.port {
t.Errorf("Expected %s:%d, got %s:%d", test.host, test.port, host, port)
}
}
}
}
func TestTcpHealthChecker(t *testing.T) {
tests := []struct {
probe *api.TCPSocketAction
expectedStatus Status
expectError bool
}{
// The probe will be filled in below. This is primarily testing that a connection is made.
{&api.TCPSocketAction{}, Healthy, false},
{&api.TCPSocketAction{}, Unhealthy, false},
{nil, Unknown, true},
}
checker := &TCPHealthChecker{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
for _, test := range tests {
container := api.Container{
LivenessProbe: &api.LivenessProbe{
TCPSocket: test.probe,
},
}
params := container.LivenessProbe.TCPSocket
if params != nil && test.expectedStatus == Healthy {
params.Port = util.NewIntOrStringFromString(port)
}
status, err := checker.HealthCheck("test", "", api.PodStatus{PodIP: host}, container)
if status != test.expectedStatus {
t.Errorf("expected: %v, got: %v", test.expectedStatus, status)
}
if err != nil && !test.expectError {
t.Errorf("unexpected error: %#v", err)
}
if err == nil && test.expectError {
t.Errorf("unexpected non-error.")
}
}
}

View File

@@ -36,11 +36,11 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@@ -164,8 +164,6 @@ type Kubelet struct {
// Optional, no events will be sent without it // Optional, no events will be sent without it
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
// Optional, defaults to simple implementaiton
healthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation // Optional, defaults to simple Docker implementation
dockerPuller dockertools.DockerPuller dockerPuller dockertools.DockerPuller
// Optional, defaults to /logs/ from /var/log // Optional, defaults to /logs/ from /var/log
@@ -427,9 +425,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.dockerPuller == nil { if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
} }
if kl.healthChecker == nil {
kl.healthChecker = health.NewHealthChecker()
}
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
} }
@@ -1038,13 +1033,13 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
// look for changes in the container. // look for changes in the container.
if hash == 0 || hash == expectedHash { if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine. // TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podFullName, uid, podStatus, container, dockerContainer) healthy, err := kl.probeLiveness(podFullName, uid, podStatus, container, dockerContainer)
if err != nil { if err != nil {
glog.V(1).Infof("health check errored: %v", err) glog.V(1).Infof("health check errored: %v", err)
containersToKeep[containerID] = empty{} containersToKeep[containerID] = empty{}
continue continue
} }
if healthy == health.Healthy { if healthy == probe.Success {
containersToKeep[containerID] = empty{} containersToKeep[containerID] = empty{}
continue continue
} }
@@ -1404,18 +1399,15 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
return podStatus, err return podStatus, err
} }
func (kl *Kubelet) healthy(podFullName string, podUID types.UID, status api.PodStatus, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) { func (kl *Kubelet) probeLiveness(podFullName string, podUID types.UID, status api.PodStatus, container api.Container, dockerContainer *docker.APIContainers) (probe.Status, error) {
// Give the container 60 seconds to start up. // Give the container 60 seconds to start up.
if container.LivenessProbe == nil { if container.LivenessProbe == nil {
return health.Healthy, nil return probe.Success, nil
} }
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
return health.Healthy, nil return probe.Success, nil
} }
if kl.healthChecker == nil { return kl.probeContainer(container.LivenessProbe, podFullName, podUID, status, container)
return health.Healthy, nil
}
return kl.healthChecker.HealthCheck(podFullName, podUID, status, container)
} }
// Returns logs of current machine. // Returns logs of current machine.

View File

@@ -31,7 +31,6 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
@@ -845,19 +844,8 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
} }
} }
type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (health.Status, error) {
return health.Unhealthy, nil
}
func (f *FalseHealthChecker) CanCheck(probe *api.LivenessProbe) bool {
return true
}
func TestSyncPodBadHash(t *testing.T) { func TestSyncPodBadHash(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker := newTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{}
dockerContainers := dockertools.DockerContainers{ dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
@@ -904,7 +892,6 @@ func TestSyncPodBadHash(t *testing.T) {
func TestSyncPodUnhealthy(t *testing.T) { func TestSyncPodUnhealthy(t *testing.T) {
kubelet, fakeDocker := newTestKubelet(t) kubelet, fakeDocker := newTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{}
dockerContainers := dockertools.DockerContainers{ dockerContainers := dockertools.DockerContainers{
"1234": &docker.APIContainers{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container

125
pkg/kubelet/probe.go Normal file
View File

@@ -0,0 +1,125 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
execprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/exec"
httprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/http"
tcprobe "github.com/GoogleCloudPlatform/kubernetes/pkg/probe/tcp"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
var (
execprober = execprobe.New()
httprober = httprobe.New()
tcprober = tcprobe.New()
)
func (kl *Kubelet) probeContainer(p *api.LivenessProbe, podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (probe.Status, error) {
if p.Exec != nil {
return execprober.Probe(kl.newExecInContainer(podFullName, podUID, container))
}
if p.HTTPGet != nil {
port, err := extractPort(p.HTTPGet.Port, container)
if err != nil {
return probe.Unknown, err
}
return httprober.Probe(extractGetParams(p.HTTPGet, status, port))
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
if err != nil {
return probe.Unknown, err
}
return tcprober.Probe(status.PodIP, port)
}
glog.Warningf("Failed to find probe builder for %s %+v", container.Name, container.LivenessProbe)
return probe.Unknown, nil
}
func extractGetParams(action *api.HTTPGetAction, status api.PodStatus, port int) (string, int, string) {
host := action.Host
if host == "" {
host = status.PodIP
}
return host, port, action.Path
}
func extractPort(param util.IntOrString, container api.Container) (int, error) {
port := -1
var err error
switch param.Kind {
case util.IntstrInt:
port := param.IntVal
if port > 0 && port < 65536 {
return port, nil
}
return port, fmt.Errorf("invalid port number: %v", port)
case util.IntstrString:
port = findPortByName(container, param.StrVal)
if port == -1 {
// Last ditch effort - maybe it was an int stored as string?
if port, err = strconv.Atoi(param.StrVal); err != nil {
return port, err
}
}
if port > 0 && port < 65536 {
return port, nil
}
return port, fmt.Errorf("invalid port number: %v", port)
default:
return port, fmt.Errorf("IntOrString had no kind: %+v", param)
}
}
// findPortByName is a helper function to look up a port in a container by name.
// Returns the HostPort if found, -1 if not found.
func findPortByName(container api.Container, portName string) int {
for _, port := range container.Ports {
if port.Name == portName {
return port.HostPort
}
}
return -1
}
type execInContainer struct {
run func() ([]byte, error)
}
func (kl *Kubelet) newExecInContainer(podFullName string, podUID types.UID, container api.Container) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return kl.RunInContainer(podFullName, podUID, container.Name, container.LivenessProbe.Exec.Command)
}}
}
func (eic execInContainer) CombinedOutput() ([]byte, error) {
return eic.run()
}
func (eic execInContainer) SetDir(dir string) {
//unimplemented
}

View File

@@ -1,5 +1,5 @@
/* /*
Copyright 2014 Google Inc. All rights reserved. Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@@ -14,19 +14,35 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package health package kubelet
import ( import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
func TestFindPortByName(t *testing.T) {
container := api.Container{
Ports: []api.Port{
{
Name: "foo",
HostPort: 8080,
},
{
Name: "bar",
HostPort: 9000,
},
},
}
want := 8080
got := findPortByName(container, "foo")
if got != want {
t.Errorf("Expected %v, got %v", want, got)
}
}
func TestGetURLParts(t *testing.T) { func TestGetURLParts(t *testing.T) {
testCases := []struct { testCases := []struct {
probe *api.HTTPGetAction probe *api.HTTPGetAction
@@ -53,13 +69,14 @@ func TestGetURLParts(t *testing.T) {
HTTPGet: test.probe, HTTPGet: test.probe,
}, },
} }
host, port, path, err := getURLParts(state, container) p, err := extractPort(test.probe.Port, container)
if !test.ok && err == nil {
t.Errorf("Expected error for %+v, got %s:%d/%s", test, host, port, path)
}
if test.ok && err != nil { if test.ok && err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
host, port, path := extractGetParams(test.probe, state, p)
if !test.ok && err == nil {
t.Errorf("Expected error for %+v, got %s:%d/%s", test, host, port, path)
}
if test.ok { if test.ok {
if host != test.host || port != test.port || path != test.path { if host != test.host || port != test.port || path != test.path {
t.Errorf("Expected %s:%d/%s, got %s:%d/%s", t.Errorf("Expected %s:%d/%s, got %s:%d/%s",
@@ -69,69 +86,41 @@ func TestGetURLParts(t *testing.T) {
} }
} }
func TestFormatURL(t *testing.T) { func TestGetTCPAddrParts(t *testing.T) {
testCases := []struct { testCases := []struct {
host string probe *api.TCPSocketAction
port int ok bool
path string host string
result string port int
}{ }{
{"localhost", 93, "", "http://localhost:93"}, {&api.TCPSocketAction{Port: util.NewIntOrStringFromInt(-1)}, false, "", -1},
{"localhost", 93, "/path", "http://localhost:93/path"}, {&api.TCPSocketAction{Port: util.NewIntOrStringFromString("")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("-1")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("not-found")}, false, "", -1},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("found")}, true, "1.2.3.4", 93},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromInt(76)}, true, "1.2.3.4", 76},
{&api.TCPSocketAction{Port: util.NewIntOrStringFromString("118")}, true, "1.2.3.4", 118},
} }
for _, test := range testCases {
url := formatURL(test.host, test.port, test.path)
if url != test.result {
t.Errorf("Expected %s, got %s", test.result, url)
}
}
}
func TestHTTPHealthChecker(t *testing.T) {
testCases := []struct {
probe *api.HTTPGetAction
status int
health Status
}{
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
{&api.HTTPGetAction{}, http.StatusOK, Healthy},
{&api.HTTPGetAction{}, -1, Unhealthy},
{nil, -1, Unknown},
}
hc := &HTTPHealthChecker{
client: &http.Client{},
}
for _, test := range testCases { for _, test := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { host := "1.2.3.4"
w.WriteHeader(test.status)
}))
u, err := url.Parse(ts.URL)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
container := api.Container{ container := api.Container{
Ports: []api.Port{{Name: "found", HostPort: 93}},
LivenessProbe: &api.LivenessProbe{ LivenessProbe: &api.LivenessProbe{
HTTPGet: test.probe, TCPSocket: test.probe,
}, },
} }
params := container.LivenessProbe.HTTPGet port, err := extractPort(test.probe.Port, container)
if params != nil { if !test.ok && err == nil {
params.Port = util.NewIntOrStringFromString(port) t.Errorf("Expected error for %+v, got %s:%d", test, host, port)
params.Host = host
} }
health, err := hc.HealthCheck("test", "", api.PodStatus{PodIP: host}, container) if test.ok && err != nil {
if test.health == Unknown && err == nil {
t.Errorf("Expected error")
}
if test.health != Unknown && err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if health != test.health { if test.ok {
t.Errorf("Expected %v, got %v", test.health, health) if host != test.host || port != test.port {
t.Errorf("Expected %s:%d, got %s:%d", test.host, test.port, host, port)
}
} }
} }
} }

View File

@@ -17,14 +17,12 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"net/http"
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@@ -46,15 +44,6 @@ func MonitorCAdvisor(k *Kubelet, cp uint) {
k.SetCadvisorClient(cadvisorClient) k.SetCadvisorClient(cadvisorClient)
} }
// TODO: move this into the kubelet itself
func InitHealthChecking(k *Kubelet) {
// TODO: These should probably become more plugin-ish: register a factory func
// in each checker's init(), iterate those here.
health.AddHealthChecker(health.NewExecHealthChecker(k))
health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{}))
health.AddHealthChecker(&health.TCPHealthChecker{})
}
// TODO: move this into a pkg/tools/etcd_tools // TODO: move this into a pkg/tools/etcd_tools
func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) tools.EtcdClient { func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) tools.EtcdClient {
if len(etcdServerList) > 0 { if len(etcdServerList) > 0 {

View File

@@ -1,5 +1,5 @@
/* /*
Copyright 2014 Google Inc. All rights reserved. Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package health contains utilities for health checking, as well as health status information. // Package probe contains utilities for health probing, as well as health status information.
package health package probe

46
pkg/probe/exec/exec.go Normal file
View File

@@ -0,0 +1,46 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
uexec "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
const defaultHealthyOutput = "ok"
func New() ExecProber {
return ExecProber{}
}
type ExecProber struct{}
func (pr ExecProber) Probe(e uexec.Cmd) (probe.Status, error) {
data, err := e.CombinedOutput()
glog.V(4).Infof("health check response: %s", string(data))
if err != nil {
return probe.Unknown, err
}
if strings.ToLower(string(data)) != defaultHealthyOutput {
return probe.Failure, nil
}
return probe.Success, nil
}

View File

@@ -0,0 +1,69 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
)
type FakeCmd struct {
out []byte
err error
}
func (f *FakeCmd) CombinedOutput() ([]byte, error) {
return f.out, f.err
}
func (f *FakeCmd) SetDir(dir string) {}
type healthCheckTest struct {
expectedStatus probe.Status
expectError bool
output []byte
err error
}
func TestExec(t *testing.T) {
prober := New()
fake := FakeCmd{}
tests := []healthCheckTest{
// Ok
{probe.Success, false, []byte("OK"), nil},
// Run returns error
{probe.Unknown, true, []byte("OK, NOT"), fmt.Errorf("test error")},
// Unhealthy
{probe.Failure, false, []byte("Fail"), nil},
}
for _, test := range tests {
fake.out = test.output
fake.err = test.err
status, err := prober.Probe(&fake)
if status != test.expectedStatus {
t.Errorf("expected %v, got %v", test.expectedStatus, status)
}
if err != nil && test.expectError == false {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectError == true {
t.Errorf("unexpected non-error")
}
}
}

73
pkg/probe/http/http.go Normal file
View File

@@ -0,0 +1,73 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package http
import (
"net"
"net/http"
"net/url"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/golang/glog"
)
func New() HTTPProber {
return HTTPProber{&http.Client{}}
}
type HTTPProber struct {
client HTTPGetInterface
}
// Probe returns a ProbeRunner capable of running an http check.
func (pr *HTTPProber) Probe(host string, port int, path string) (probe.Status, error) {
return DoHTTPProbe(formatURL(host, port, path), pr.client)
}
type HTTPGetInterface interface {
Get(u string) (*http.Response, error)
}
// DoHTTPProbe checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Healthy.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Unhealthy.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(url string, client HTTPGetInterface) (probe.Status, error) {
res, err := client.Get(url)
if err != nil {
glog.V(1).Infof("HTTP probe error: %v", err)
return probe.Failure, nil
}
defer res.Body.Close()
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
return probe.Success, nil
}
glog.V(1).Infof("Health check failed for %s, Response: %v", url, *res)
return probe.Failure, nil
}
// formatURL formats a URL from args. For testability.
func formatURL(host string, port int, path string) string {
u := url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
Path: path,
}
return u.String()
}

View File

@@ -0,0 +1,85 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package http
import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
)
func TestFormatURL(t *testing.T) {
testCases := []struct {
host string
port int
path string
result string
}{
{"localhost", 93, "", "http://localhost:93"},
{"localhost", 93, "/path", "http://localhost:93/path"},
}
for _, test := range testCases {
url := formatURL(test.host, test.port, test.path)
if url != test.result {
t.Errorf("Expected %s, got %s", test.result, url)
}
}
}
func TestHTTPProbeChecker(t *testing.T) {
prober := New()
testCases := []struct {
status int
health probe.Status
}{
// The probe will be filled in below. This is primarily testing that an HTTP GET happens.
{http.StatusOK, probe.Success},
{-1, probe.Failure},
}
for _, test := range testCases {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(test.status)
}))
u, err := url.Parse(ts.URL)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
p, err := strconv.Atoi(port)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
health, err := prober.Probe(host, p, "")
if test.health == probe.Unknown && err == nil {
t.Errorf("Expected error")
}
if test.health != probe.Unknown && err != nil {
t.Errorf("Unexpected error: %v", err)
}
if health != test.health {
t.Errorf("Expected %v, got %v", test.health, health)
}
}
}

37
pkg/probe/probe.go Normal file
View File

@@ -0,0 +1,37 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
type Status int
// Status values must be one of these constants.
const (
Success Status = iota
Failure
Unknown
)
func (s Status) String() string {
switch s {
case Success:
return "success"
case Failure:
return "failure"
default:
return "unknown"
}
}

52
pkg/probe/tcp/tcp.go Normal file
View File

@@ -0,0 +1,52 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tcp
import (
"net"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/golang/glog"
)
func New() TCPProber {
return TCPProber{}
}
type TCPProber struct{}
func (pr TCPProber) Probe(host string, port int) (probe.Status, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)))
}
// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Healthy.
// If the socket fails to open, it returns Unhealthy.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string) (probe.Status, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return probe.Failure, nil
}
err = conn.Close()
if err != nil {
glog.Errorf("unexpected error closing health check socket: %v (%#v)", err, err)
}
return probe.Success, nil
}

73
pkg/probe/tcp/tcp_test.go Normal file
View File

@@ -0,0 +1,73 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tcp
import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
)
func TestTcpHealthChecker(t *testing.T) {
prober := New()
tests := []struct {
expectedStatus probe.Status
usePort bool
expectError bool
}{
// The probe will be filled in below. This is primarily testing that a connection is made.
{probe.Success, true, false},
{probe.Failure, false, false},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
for _, test := range tests {
p, err := strconv.Atoi(port)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !test.usePort {
p = -1
}
status, err := prober.Probe(host, p)
if status != test.expectedStatus {
t.Errorf("expected: %v, got: %v", test.expectedStatus, status)
}
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectError {
t.Errorf("unexpected non-error.")
}
}
}

View File

@@ -22,8 +22,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@@ -115,7 +115,7 @@ func (r *HealthyRegistry) doCheck(key string) util.T {
case err != nil: case err != nil:
glog.V(2).Infof("HealthyRegistry: node %q health check error: %v", key, err) glog.V(2).Infof("HealthyRegistry: node %q health check error: %v", key, err)
nodeStatus = api.ConditionUnknown nodeStatus = api.ConditionUnknown
case status == health.Unhealthy: case status == probe.Failure:
nodeStatus = api.ConditionNone nodeStatus = api.ConditionNone
default: default:
nodeStatus = api.ConditionFull nodeStatus = api.ConditionFull

View File

@@ -22,15 +22,15 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
type alwaysYes struct{} type alwaysYes struct{}
func (alwaysYes) HealthCheck(host string) (health.Status, error) { func (alwaysYes) HealthCheck(host string) (probe.Status, error) {
return health.Healthy, nil return probe.Success, nil
} }
func TestBasicDelegation(t *testing.T) { func TestBasicDelegation(t *testing.T) {
@@ -75,11 +75,11 @@ type notMinion struct {
minion string minion string
} }
func (n *notMinion) HealthCheck(host string) (health.Status, error) { func (n *notMinion) HealthCheck(host string) (probe.Status, error) {
if host != n.minion { if host != n.minion {
return health.Healthy, nil return probe.Success, nil
} else { } else {
return health.Unhealthy, nil return probe.Failure, nil
} }
} }

View File

@@ -305,7 +305,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
go k.GarbageCollectLoop() go k.GarbageCollectLoop()
go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort) go kubelet.MonitorCAdvisor(k, kc.CAdvisorPort)
kubelet.InitHealthChecking(k)
return k, nil return k, nil
} }