Add etcd to the list of services to validate.
Also add minions.
This commit is contained in:
parent
808be2d13b
commit
d7dc20fd6a
@ -84,6 +84,15 @@ func NewAPIGroup(storage map[string]RESTStorage, codec runtime.Codec, canonicalP
|
|||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func InstallValidator(mux Mux, servers map[string]Server) {
|
||||||
|
validator, err := NewValidator(servers)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("failed to set up validator: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mux.Handle("/validate", validator)
|
||||||
|
}
|
||||||
|
|
||||||
// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
|
// InstallREST registers the REST handlers (storage, watch, and operations) into a mux.
|
||||||
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
|
// It is expected that the provided prefix will serve all operations. Path MUST NOT end
|
||||||
// in a slash.
|
// in a slash.
|
||||||
@ -93,16 +102,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
|
|||||||
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
|
redirectHandler := &RedirectHandler{g.handler.storage, g.handler.codec}
|
||||||
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
|
opHandler := &OperationHandler{g.handler.ops, g.handler.codec}
|
||||||
|
|
||||||
servers := map[string]string{
|
|
||||||
"controller-manager": "127.0.0.1:10252",
|
|
||||||
"scheduler": "127.0.0.1:10251",
|
|
||||||
// TODO: Add minion health checks here too.
|
|
||||||
}
|
|
||||||
validator, err := NewValidator(servers)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("failed to set up validator: %v", err)
|
|
||||||
validator = nil
|
|
||||||
}
|
|
||||||
for _, prefix := range paths {
|
for _, prefix := range paths {
|
||||||
prefix = strings.TrimRight(prefix, "/")
|
prefix = strings.TrimRight(prefix, "/")
|
||||||
proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec}
|
proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec}
|
||||||
@ -112,9 +111,6 @@ func (g *APIGroup) InstallREST(mux Mux, paths ...string) {
|
|||||||
mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler))
|
mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler))
|
||||||
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
|
mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler))
|
||||||
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
|
mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler))
|
||||||
if validator != nil {
|
|
||||||
mux.Handle(prefix+"/validate", validator)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -33,20 +32,21 @@ type httpGet interface {
|
|||||||
Get(url string) (*http.Response, error)
|
Get(url string) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type server struct {
|
type Server struct {
|
||||||
addr string
|
Addr string
|
||||||
port int
|
Port int
|
||||||
|
Path string
|
||||||
}
|
}
|
||||||
|
|
||||||
// validator is responsible for validating the cluster and serving
|
// validator is responsible for validating the cluster and serving
|
||||||
type validator struct {
|
type validator struct {
|
||||||
// a list of servers to health check
|
// a list of servers to health check
|
||||||
servers map[string]server
|
servers map[string]Server
|
||||||
client httpGet
|
client httpGet
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) check(client httpGet) (health.Status, string, error) {
|
func (s *Server) check(client httpGet) (health.Status, string, error) {
|
||||||
resp, err := client.Get("http://" + net.JoinHostPort(s.addr, strconv.Itoa(s.port)) + "/healthz")
|
resp, err := client.Get("http://" + net.JoinHostPort(s.Addr, strconv.Itoa(s.Port)) + s.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return health.Unknown, "", err
|
return health.Unknown, "", err
|
||||||
}
|
}
|
||||||
@ -82,8 +82,7 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
reply = append(reply, ServerStatus{name, status.String(), status, msg, errorMsg})
|
reply = append(reply, ServerStatus{name, status.String(), status, msg, errorMsg})
|
||||||
}
|
}
|
||||||
data, err := json.Marshal(reply)
|
data, err := json.MarshalIndent(reply, "", " ")
|
||||||
log.Printf("FOO: %s", string(data))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
w.Write([]byte(err.Error()))
|
w.Write([]byte(err.Error()))
|
||||||
@ -94,8 +93,15 @@ func (v *validator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewValidator creates a validator for a set of servers.
|
// NewValidator creates a validator for a set of servers.
|
||||||
func NewValidator(servers map[string]string) (http.Handler, error) {
|
func NewValidator(servers map[string]Server) (http.Handler, error) {
|
||||||
result := map[string]server{}
|
return &validator{
|
||||||
|
servers: servers,
|
||||||
|
client: &http.Client{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
|
||||||
|
result := map[string]Server{}
|
||||||
for name, value := range servers {
|
for name, value := range servers {
|
||||||
host, port, err := net.SplitHostPort(value)
|
host, port, err := net.SplitHostPort(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -105,16 +111,10 @@ func NewValidator(servers map[string]string) (http.Handler, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid server spec: %s (%v)", port, err)
|
return nil, fmt.Errorf("invalid server spec: %s (%v)", port, err)
|
||||||
}
|
}
|
||||||
result[name] = server{host, val}
|
result[name] = Server{Addr: host, Port: val, Path: "/healthz"}
|
||||||
}
|
}
|
||||||
return &validator{
|
|
||||||
servers: result,
|
|
||||||
client: &http.Client{},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeTestValidator(servers map[string]string, get httpGet) (http.Handler, error) {
|
v, e := NewValidator(result)
|
||||||
v, e := NewValidator(servers)
|
|
||||||
if e == nil {
|
if e == nil {
|
||||||
v.(*validator).client = get
|
v.(*validator).client = get
|
||||||
}
|
}
|
||||||
|
@ -63,12 +63,12 @@ func TestValidate(t *testing.T) {
|
|||||||
{nil, "foo", health.Unhealthy, 500, true},
|
{nil, "foo", health.Unhealthy, 500, true},
|
||||||
}
|
}
|
||||||
|
|
||||||
s := server{addr: "foo.com", port: 8080}
|
s := Server{Addr: "foo.com", Port: 8080, Path: "/healthz"}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
fake := makeFake(test.data, test.code, test.err)
|
fake := makeFake(test.data, test.code, test.err)
|
||||||
status, data, err := s.check(fake)
|
status, data, err := s.check(fake)
|
||||||
expect := fmt.Sprintf("http://%s:%d/healthz", s.addr, s.port)
|
expect := fmt.Sprintf("http://%s:%d/healthz", s.Addr, s.Port)
|
||||||
if fake.url != expect {
|
if fake.url != expect {
|
||||||
t.Errorf("expected %s, got %s", expect, fake.url)
|
t.Errorf("expected %s, got %s", expect, fake.url)
|
||||||
}
|
}
|
||||||
|
@ -17,12 +17,15 @@ limitations under the License.
|
|||||||
package master
|
package master
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
||||||
@ -297,6 +300,9 @@ func (m *Master) init(c *Config) {
|
|||||||
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
|
versionHandler := apiserver.APIVersionHandler("v1beta1", "v1beta2")
|
||||||
m.mux.Handle(c.APIPrefix, versionHandler)
|
m.mux.Handle(c.APIPrefix, versionHandler)
|
||||||
apiserver.InstallSupport(m.mux)
|
apiserver.InstallSupport(m.mux)
|
||||||
|
serversToValidate := m.getServersToValidate(c)
|
||||||
|
|
||||||
|
apiserver.InstallValidator(m.mux, serversToValidate)
|
||||||
if c.EnableLogsSupport {
|
if c.EnableLogsSupport {
|
||||||
apiserver.InstallLogsSupport(m.mux)
|
apiserver.InstallLogsSupport(m.mux)
|
||||||
}
|
}
|
||||||
@ -333,6 +339,43 @@ func (m *Master) init(c *Config) {
|
|||||||
m.masterServices.Start()
|
m.masterServices.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
|
||||||
|
serversToValidate := map[string]apiserver.Server{
|
||||||
|
"controller-manager": {Addr: "127.0.0.1", Port: 10252, Path: "/healthz"},
|
||||||
|
"scheduler": {Addr: "127.0.0.1", Port: 10251, Path: "/healthz"},
|
||||||
|
}
|
||||||
|
for ix, machine := range c.EtcdHelper.Client.GetCluster() {
|
||||||
|
etcdUrl, err := url.Parse(machine)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to parse etcd url for validation: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var port int
|
||||||
|
var addr string
|
||||||
|
if strings.Contains(etcdUrl.Host, ":") {
|
||||||
|
var portString string
|
||||||
|
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
port, _ = strconv.Atoi(portString)
|
||||||
|
} else {
|
||||||
|
addr = etcdUrl.Host
|
||||||
|
port = 4001
|
||||||
|
}
|
||||||
|
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/v2/keys/"}
|
||||||
|
}
|
||||||
|
nodes, err := m.minionRegistry.ListMinions(api.NewDefaultContext())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to list minions: %v", err)
|
||||||
|
}
|
||||||
|
for ix, node := range nodes.Items {
|
||||||
|
serversToValidate[fmt.Sprintf("node-%d", ix)] = apiserver.Server{Addr: node.HostIP, Port: 10250, Path: "/healthz"}
|
||||||
|
}
|
||||||
|
return serversToValidate
|
||||||
|
}
|
||||||
|
|
||||||
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
||||||
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
|
func (m *Master) API_v1beta1() (map[string]apiserver.RESTStorage, runtime.Codec, string, runtime.SelfLinker) {
|
||||||
storage := make(map[string]apiserver.RESTStorage)
|
storage := make(map[string]apiserver.RESTStorage)
|
||||||
|
47
pkg/master/master_test.go
Normal file
47
pkg/master/master_test.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package master
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetServersToValidate(t *testing.T) {
|
||||||
|
master := Master{}
|
||||||
|
config := Config{}
|
||||||
|
fakeClient := tools.NewFakeEtcdClient(t)
|
||||||
|
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
|
||||||
|
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}
|
||||||
|
|
||||||
|
master.minionRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})
|
||||||
|
|
||||||
|
servers := master.getServersToValidate(&config)
|
||||||
|
|
||||||
|
if len(servers) != 7 {
|
||||||
|
t.Errorf("unexpected server list: %#v", servers)
|
||||||
|
}
|
||||||
|
for _, server := range []string{"scheduler", "controller-manager", "etcd-0", "etcd-1", "etcd-2", "node-0", "node-1"} {
|
||||||
|
if _, ok := servers[server]; !ok {
|
||||||
|
t.Errorf("server list missing: %s", server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -43,6 +43,7 @@ var (
|
|||||||
|
|
||||||
// EtcdClient is an injectable interface for testing.
|
// EtcdClient is an injectable interface for testing.
|
||||||
type EtcdClient interface {
|
type EtcdClient interface {
|
||||||
|
GetCluster() []string
|
||||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
@ -56,6 +57,7 @@ type EtcdClient interface {
|
|||||||
|
|
||||||
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
|
||||||
type EtcdGetSet interface {
|
type EtcdGetSet interface {
|
||||||
|
GetCluster() []string
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
@ -50,6 +50,7 @@ type FakeEtcdClient struct {
|
|||||||
TestIndex bool
|
TestIndex bool
|
||||||
ChangeIndex uint64
|
ChangeIndex uint64
|
||||||
LastSetTTL uint64
|
LastSetTTL uint64
|
||||||
|
Machines []string
|
||||||
|
|
||||||
// Will become valid after Watch is called; tester may write to it. Tester may
|
// Will become valid after Watch is called; tester may write to it. Tester may
|
||||||
// also read from it to verify that it's closed after injecting an error.
|
// also read from it to verify that it's closed after injecting an error.
|
||||||
@ -83,6 +84,10 @@ func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeEtcdClient) GetCluster() []string {
|
||||||
|
return f.Machines
|
||||||
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
|
func (f *FakeEtcdClient) ExpectNotFoundGet(key string) {
|
||||||
f.expectNotFoundGetSet[key] = struct{}{}
|
f.expectNotFoundGetSet[key] = struct{}{}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user