Merge pull request #39821 from stu-gott/apiserver-checketcd
Automatic merge from submit-queue (batch tested with PRs 41931, 39821, 41841, 42197, 42195) Apiserver: wait for Etcd to become available on startup fixes #37704
This commit is contained in:
@@ -16,6 +16,7 @@ go_library(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//cmd/kube-apiserver/app/options:go_default_library",
|
"//cmd/kube-apiserver/app/options:go_default_library",
|
||||||
|
"//cmd/kube-apiserver/app/preflight:go_default_library",
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
"//pkg/apis/apps:go_default_library",
|
"//pkg/apis/apps:go_default_library",
|
||||||
"//pkg/apis/batch:go_default_library",
|
"//pkg/apis/batch:go_default_library",
|
||||||
@@ -86,6 +87,7 @@ filegroup(
|
|||||||
srcs = [
|
srcs = [
|
||||||
":package-srcs",
|
":package-srcs",
|
||||||
"//cmd/kube-apiserver/app/options:all-srcs",
|
"//cmd/kube-apiserver/app/options:all-srcs",
|
||||||
|
"//cmd/kube-apiserver/app/preflight:all-srcs",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
)
|
)
|
||||||
|
36
cmd/kube-apiserver/app/preflight/BUILD
Normal file
36
cmd/kube-apiserver/app/preflight/BUILD
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package(default_visibility = ["//visibility:public"])
|
||||||
|
|
||||||
|
licenses(["notice"])
|
||||||
|
|
||||||
|
load(
|
||||||
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
|
"go_library",
|
||||||
|
"go_test",
|
||||||
|
)
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["checks.go"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["checks_test.go"],
|
||||||
|
library = ":go_default_library",
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = ["//vendor:k8s.io/apimachinery/pkg/util/wait"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
68
cmd/kube-apiserver/app/preflight/checks.go
Normal file
68
cmd/kube-apiserver/app/preflight/checks.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package preflight
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const connectionTimeout = 1 * time.Second
|
||||||
|
|
||||||
|
type connection interface {
|
||||||
|
serverReachable(address string) bool
|
||||||
|
parseServerList(serverList []string) error
|
||||||
|
CheckEtcdServers() (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type EtcdConnection struct {
|
||||||
|
ServerList []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (EtcdConnection) serverReachable(address string) bool {
|
||||||
|
if conn, err := net.DialTimeout("tcp", address, connectionTimeout); err == nil {
|
||||||
|
defer conn.Close()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseServerURI(serverURI string) (string, error) {
|
||||||
|
connUrl, err := url.Parse(serverURI)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("unable to parse etcd url: %v", err)
|
||||||
|
}
|
||||||
|
return connUrl.Host, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckEtcdServers will attempt to reach all etcd servers once. If any
|
||||||
|
// can be reached, return true.
|
||||||
|
func (con EtcdConnection) CheckEtcdServers() (done bool, err error) {
|
||||||
|
// Attempt to reach every Etcd server in order
|
||||||
|
for _, serverUri := range con.ServerList {
|
||||||
|
host, err := parseServerURI(serverUri)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if con.serverReachable(host) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
96
cmd/kube-apiserver/app/preflight/checks_test.go
Normal file
96
cmd/kube-apiserver/app/preflight/checks_test.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package preflight
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseServerURIGood(t *testing.T) {
|
||||||
|
host, err := parseServerURI("https://127.0.0.1:2379")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reference := "127.0.0.1:2379"
|
||||||
|
if host != reference {
|
||||||
|
t.Fatal("server uri was not parsed correctly")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseServerURIBad(t *testing.T) {
|
||||||
|
_, err := parseServerURI("-invalid uri$@#%")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected bad uri to raise parse error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEtcdConnection(t *testing.T) {
|
||||||
|
etcd := new(EtcdConnection)
|
||||||
|
|
||||||
|
result := etcd.serverReachable("-not a real network address-")
|
||||||
|
if result {
|
||||||
|
t.Fatal("checkConnection should not have succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckEtcdServersEmpty(t *testing.T) {
|
||||||
|
etcd := new(EtcdConnection)
|
||||||
|
result, err := etcd.CheckEtcdServers()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if result {
|
||||||
|
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckEtcdServersUri(t *testing.T) {
|
||||||
|
etcd := new(EtcdConnection)
|
||||||
|
etcd.ServerList = []string{"-invalid uri$@#%"}
|
||||||
|
result, err := etcd.CheckEtcdServers()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected bad uri to raise parse error")
|
||||||
|
}
|
||||||
|
if result {
|
||||||
|
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckEtcdServers(t *testing.T) {
|
||||||
|
etcd := new(EtcdConnection)
|
||||||
|
etcd.ServerList = []string{""}
|
||||||
|
result, err := etcd.CheckEtcdServers()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if result {
|
||||||
|
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPollCheckServer(t *testing.T) {
|
||||||
|
err := utilwait.PollImmediate(1*time.Microsecond,
|
||||||
|
2*time.Microsecond,
|
||||||
|
EtcdConnection{ServerList: []string{""}}.CheckEtcdServers)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected check to time out")
|
||||||
|
}
|
||||||
|
}
|
@@ -43,11 +43,13 @@ import (
|
|||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/admission"
|
"k8s.io/apiserver/pkg/admission"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/apiserver/pkg/server/filters"
|
"k8s.io/apiserver/pkg/server/filters"
|
||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||||
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/preflight"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/apps"
|
"k8s.io/kubernetes/pkg/apis/apps"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
@@ -68,6 +70,9 @@ import (
|
|||||||
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
|
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const etcdRetryLimit = 60
|
||||||
|
const etcdRetryInterval = 1 * time.Second
|
||||||
|
|
||||||
// NewAPIServerCommand creates a *cobra.Command object with default parameters
|
// NewAPIServerCommand creates a *cobra.Command object with default parameters
|
||||||
func NewAPIServerCommand() *cobra.Command {
|
func NewAPIServerCommand() *cobra.Command {
|
||||||
s := options.NewServerRunOptions()
|
s := options.NewServerRunOptions()
|
||||||
@@ -152,6 +157,9 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
|
|||||||
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
if err := s.Features.ApplyTo(genericConfig); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
if err := utilwait.PollImmediate(etcdRetryInterval, etcdRetryLimit*etcdRetryInterval, preflight.EtcdConnection{ServerList: s.Etcd.StorageConfig.ServerList}.CheckEtcdServers); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("error waiting for etcd connection: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Use protobufs for self-communication.
|
// Use protobufs for self-communication.
|
||||||
// Since not every generic apiserver has to support protobufs, we
|
// Since not every generic apiserver has to support protobufs, we
|
||||||
|
Reference in New Issue
Block a user