Version bump to etcd v3.2.13
This commit is contained in:
60
vendor/github.com/coreos/etcd/embed/BUILD
generated
vendored
Normal file
60
vendor/github.com/coreos/etcd/embed/BUILD
generated
vendored
Normal file
@@ -0,0 +1,60 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"config.go",
|
||||
"doc.go",
|
||||
"etcd.go",
|
||||
"serve.go",
|
||||
"util.go",
|
||||
],
|
||||
importpath = "github.com/coreos/etcd/embed",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/github.com/cockroachdb/cmux:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/etcdhttp:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v2http:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3client:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3election:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb/gw:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3lock:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb/gw:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/etcdserverpb/gw:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/cors:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/debugutil:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/netutil:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/runtime:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/srv:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/types:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/rafthttp:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/wal:go_default_library",
|
||||
"//vendor/github.com/coreos/pkg/capnslog:go_default_library",
|
||||
"//vendor/github.com/ghodss/yaml:go_default_library",
|
||||
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/golang.org/x/net/trace:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/credentials:go_default_library",
|
||||
"//vendor/google.golang.org/grpc/keepalive:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
464
vendor/github.com/coreos/etcd/embed/config.go
generated
vendored
Normal file
464
vendor/github.com/coreos/etcd/embed/config.go
generated
vendored
Normal file
@@ -0,0 +1,464 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/srv"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
ClusterStateFlagNew = "new"
|
||||
ClusterStateFlagExisting = "existing"
|
||||
|
||||
DefaultName = "default"
|
||||
DefaultMaxSnapshots = 5
|
||||
DefaultMaxWALs = 5
|
||||
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
|
||||
DefaultGRPCKeepAliveMinTime = 5 * time.Second
|
||||
DefaultGRPCKeepAliveInterval = 2 * time.Hour
|
||||
DefaultGRPCKeepAliveTimeout = 20 * time.Second
|
||||
|
||||
DefaultListenPeerURLs = "http://localhost:2380"
|
||||
DefaultListenClientURLs = "http://localhost:2379"
|
||||
|
||||
// maxElectionMs specifies the maximum value of election timeout.
|
||||
// More details are listed in ../Documentation/tuning.md#time-parameters.
|
||||
maxElectionMs = 50000
|
||||
)
|
||||
|
||||
var (
|
||||
ErrConflictBootstrapFlags = fmt.Errorf("multiple discovery or bootstrap flags are set. " +
|
||||
"Choose one of \"initial-cluster\", \"discovery\" or \"discovery-srv\"")
|
||||
ErrUnsetAdvertiseClientURLsFlag = fmt.Errorf("--advertise-client-urls is required when --listen-client-urls is set explicitly")
|
||||
|
||||
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
|
||||
DefaultAdvertiseClientURLs = "http://localhost:2379"
|
||||
|
||||
defaultHostname string
|
||||
defaultHostStatus error
|
||||
)
|
||||
|
||||
func init() {
|
||||
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
|
||||
}
|
||||
|
||||
// Config holds the arguments for configuring an etcd server.
|
||||
type Config struct {
|
||||
// member
|
||||
|
||||
CorsInfo *cors.CORSInfo
|
||||
LPUrls, LCUrls []url.URL
|
||||
Dir string `json:"data-dir"`
|
||||
WalDir string `json:"wal-dir"`
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
Name string `json:"name"`
|
||||
SnapCount uint64 `json:"snapshot-count"`
|
||||
AutoCompactionRetention int `json:"auto-compaction-retention"`
|
||||
|
||||
// TickMs is the number of milliseconds between heartbeat ticks.
|
||||
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
|
||||
// make ticks a cluster wide configuration.
|
||||
TickMs uint `json:"heartbeat-interval"`
|
||||
ElectionMs uint `json:"election-timeout"`
|
||||
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
|
||||
MaxRequestBytes uint `json:"max-request-bytes"`
|
||||
|
||||
// gRPC server options
|
||||
|
||||
// GRPCKeepAliveMinTime is the minimum interval that a client should
|
||||
// wait before pinging server. When client pings "too fast", server
|
||||
// sends goaway and closes the connection (errors: too_many_pings,
|
||||
// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
|
||||
// Server expects client pings only when there is any active streams
|
||||
// (PermitWithoutStream is set false).
|
||||
GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
|
||||
// GRPCKeepAliveInterval is the frequency of server-to-client ping
|
||||
// to check if a connection is alive. Close a non-responsive connection
|
||||
// after an additional duration of Timeout. 0 to disable.
|
||||
GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
|
||||
// GRPCKeepAliveTimeout is the additional duration of wait
|
||||
// before closing a non-responsive connection. 0 to disable.
|
||||
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`
|
||||
|
||||
// clustering
|
||||
|
||||
APUrls, ACUrls []url.URL
|
||||
ClusterState string `json:"initial-cluster-state"`
|
||||
DNSCluster string `json:"discovery-srv"`
|
||||
Dproxy string `json:"discovery-proxy"`
|
||||
Durl string `json:"discovery"`
|
||||
InitialCluster string `json:"initial-cluster"`
|
||||
InitialClusterToken string `json:"initial-cluster-token"`
|
||||
StrictReconfigCheck bool `json:"strict-reconfig-check"`
|
||||
EnableV2 bool `json:"enable-v2"`
|
||||
|
||||
// security
|
||||
|
||||
ClientTLSInfo transport.TLSInfo
|
||||
ClientAutoTLS bool
|
||||
PeerTLSInfo transport.TLSInfo
|
||||
PeerAutoTLS bool
|
||||
|
||||
// debug
|
||||
|
||||
Debug bool `json:"debug"`
|
||||
LogPkgLevels string `json:"log-package-levels"`
|
||||
EnablePprof bool `json:"enable-pprof"`
|
||||
Metrics string `json:"metrics"`
|
||||
|
||||
// ForceNewCluster starts a new cluster even if previously started; unsafe.
|
||||
ForceNewCluster bool `json:"force-new-cluster"`
|
||||
|
||||
// UserHandlers is for registering users handlers and only used for
|
||||
// embedding etcd into other applications.
|
||||
// The map key is the route path for the handler, and
|
||||
// you must ensure it can't be conflicted with etcd's.
|
||||
UserHandlers map[string]http.Handler `json:"-"`
|
||||
// ServiceRegister is for registering users' gRPC services. A simple usage example:
|
||||
// cfg := embed.NewConfig()
|
||||
// cfg.ServerRegister = func(s *grpc.Server) {
|
||||
// pb.RegisterFooServer(s, &fooServer{})
|
||||
// pb.RegisterBarServer(s, &barServer{})
|
||||
// }
|
||||
// embed.StartEtcd(cfg)
|
||||
ServiceRegister func(*grpc.Server) `json:"-"`
|
||||
|
||||
// auth
|
||||
|
||||
AuthToken string `json:"auth-token"`
|
||||
}
|
||||
|
||||
// configYAML holds the config suitable for yaml parsing
|
||||
type configYAML struct {
|
||||
Config
|
||||
configJSON
|
||||
}
|
||||
|
||||
// configJSON has file options that are translated into Config options
|
||||
type configJSON struct {
|
||||
LPUrlsJSON string `json:"listen-peer-urls"`
|
||||
LCUrlsJSON string `json:"listen-client-urls"`
|
||||
CorsJSON string `json:"cors"`
|
||||
APUrlsJSON string `json:"initial-advertise-peer-urls"`
|
||||
ACUrlsJSON string `json:"advertise-client-urls"`
|
||||
ClientSecurityJSON securityConfig `json:"client-transport-security"`
|
||||
PeerSecurityJSON securityConfig `json:"peer-transport-security"`
|
||||
}
|
||||
|
||||
type securityConfig struct {
|
||||
CAFile string `json:"ca-file"`
|
||||
CertFile string `json:"cert-file"`
|
||||
KeyFile string `json:"key-file"`
|
||||
CertAuth bool `json:"client-cert-auth"`
|
||||
TrustedCAFile string `json:"trusted-ca-file"`
|
||||
AutoTLS bool `json:"auto-tls"`
|
||||
}
|
||||
|
||||
// NewConfig creates a new Config populated with default values.
|
||||
func NewConfig() *Config {
|
||||
lpurl, _ := url.Parse(DefaultListenPeerURLs)
|
||||
apurl, _ := url.Parse(DefaultInitialAdvertisePeerURLs)
|
||||
lcurl, _ := url.Parse(DefaultListenClientURLs)
|
||||
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
|
||||
cfg := &Config{
|
||||
CorsInfo: &cors.CORSInfo{},
|
||||
MaxSnapFiles: DefaultMaxSnapshots,
|
||||
MaxWalFiles: DefaultMaxWALs,
|
||||
Name: DefaultName,
|
||||
SnapCount: etcdserver.DefaultSnapCount,
|
||||
MaxRequestBytes: DefaultMaxRequestBytes,
|
||||
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
|
||||
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
|
||||
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,
|
||||
TickMs: 100,
|
||||
ElectionMs: 1000,
|
||||
LPUrls: []url.URL{*lpurl},
|
||||
LCUrls: []url.URL{*lcurl},
|
||||
APUrls: []url.URL{*apurl},
|
||||
ACUrls: []url.URL{*acurl},
|
||||
ClusterState: ClusterStateFlagNew,
|
||||
InitialClusterToken: "etcd-cluster",
|
||||
StrictReconfigCheck: true,
|
||||
Metrics: "basic",
|
||||
EnableV2: true,
|
||||
AuthToken: "simple",
|
||||
}
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
return cfg
|
||||
}
|
||||
|
||||
func ConfigFromFile(path string) (*Config, error) {
|
||||
cfg := &configYAML{Config: *NewConfig()}
|
||||
if err := cfg.configFromFile(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cfg.Config, nil
|
||||
}
|
||||
|
||||
func (cfg *configYAML) configFromFile(path string) error {
|
||||
b, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defaultInitialCluster := cfg.InitialCluster
|
||||
|
||||
err = yaml.Unmarshal(b, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cfg.LPUrlsJSON != "" {
|
||||
u, err := types.NewURLs(strings.Split(cfg.LPUrlsJSON, ","))
|
||||
if err != nil {
|
||||
plog.Fatalf("unexpected error setting up listen-peer-urls: %v", err)
|
||||
}
|
||||
cfg.LPUrls = []url.URL(u)
|
||||
}
|
||||
|
||||
if cfg.LCUrlsJSON != "" {
|
||||
u, err := types.NewURLs(strings.Split(cfg.LCUrlsJSON, ","))
|
||||
if err != nil {
|
||||
plog.Fatalf("unexpected error setting up listen-client-urls: %v", err)
|
||||
}
|
||||
cfg.LCUrls = []url.URL(u)
|
||||
}
|
||||
|
||||
if cfg.CorsJSON != "" {
|
||||
if err := cfg.CorsInfo.Set(cfg.CorsJSON); err != nil {
|
||||
plog.Panicf("unexpected error setting up cors: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.APUrlsJSON != "" {
|
||||
u, err := types.NewURLs(strings.Split(cfg.APUrlsJSON, ","))
|
||||
if err != nil {
|
||||
plog.Fatalf("unexpected error setting up initial-advertise-peer-urls: %v", err)
|
||||
}
|
||||
cfg.APUrls = []url.URL(u)
|
||||
}
|
||||
|
||||
if cfg.ACUrlsJSON != "" {
|
||||
u, err := types.NewURLs(strings.Split(cfg.ACUrlsJSON, ","))
|
||||
if err != nil {
|
||||
plog.Fatalf("unexpected error setting up advertise-peer-urls: %v", err)
|
||||
}
|
||||
cfg.ACUrls = []url.URL(u)
|
||||
}
|
||||
|
||||
// If a discovery flag is set, clear default initial cluster set by InitialClusterFromName
|
||||
if (cfg.Durl != "" || cfg.DNSCluster != "") && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = ""
|
||||
}
|
||||
if cfg.ClusterState == "" {
|
||||
cfg.ClusterState = ClusterStateFlagNew
|
||||
}
|
||||
|
||||
copySecurityDetails := func(tls *transport.TLSInfo, ysc *securityConfig) {
|
||||
tls.CAFile = ysc.CAFile
|
||||
tls.CertFile = ysc.CertFile
|
||||
tls.KeyFile = ysc.KeyFile
|
||||
tls.ClientCertAuth = ysc.CertAuth
|
||||
tls.TrustedCAFile = ysc.TrustedCAFile
|
||||
}
|
||||
copySecurityDetails(&cfg.ClientTLSInfo, &cfg.ClientSecurityJSON)
|
||||
copySecurityDetails(&cfg.PeerTLSInfo, &cfg.PeerSecurityJSON)
|
||||
cfg.ClientAutoTLS = cfg.ClientSecurityJSON.AutoTLS
|
||||
cfg.PeerAutoTLS = cfg.PeerSecurityJSON.AutoTLS
|
||||
|
||||
return cfg.Validate()
|
||||
}
|
||||
|
||||
func (cfg *Config) Validate() error {
|
||||
if err := checkBindURLs(cfg.LPUrls); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkBindURLs(cfg.LCUrls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if conflicting flags are passed.
|
||||
nSet := 0
|
||||
for _, v := range []bool{cfg.Durl != "", cfg.InitialCluster != "", cfg.DNSCluster != ""} {
|
||||
if v {
|
||||
nSet++
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.ClusterState != ClusterStateFlagNew && cfg.ClusterState != ClusterStateFlagExisting {
|
||||
return fmt.Errorf("unexpected clusterState %q", cfg.ClusterState)
|
||||
}
|
||||
|
||||
if nSet > 1 {
|
||||
return ErrConflictBootstrapFlags
|
||||
}
|
||||
|
||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||
return fmt.Errorf("--election-timeout[%vms] should be at least as 5 times as --heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
|
||||
}
|
||||
if cfg.ElectionMs > maxElectionMs {
|
||||
return fmt.Errorf("--election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
|
||||
}
|
||||
|
||||
// check this last since proxying in etcdmain may make this OK
|
||||
if cfg.LCUrls != nil && cfg.ACUrls == nil {
|
||||
return ErrUnsetAdvertiseClientURLsFlag
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PeerURLsMapAndToken sets up an initial peer URLsMap and cluster token for bootstrap or discovery.
|
||||
func (cfg *Config) PeerURLsMapAndToken(which string) (urlsmap types.URLsMap, token string, err error) {
|
||||
token = cfg.InitialClusterToken
|
||||
switch {
|
||||
case cfg.Durl != "":
|
||||
urlsmap = types.URLsMap{}
|
||||
// If using discovery, generate a temporary cluster based on
|
||||
// self's advertised peer URLs
|
||||
urlsmap[cfg.Name] = cfg.APUrls
|
||||
token = cfg.Durl
|
||||
case cfg.DNSCluster != "":
|
||||
clusterStrs, cerr := srv.GetCluster("etcd-server", cfg.Name, cfg.DNSCluster, cfg.APUrls)
|
||||
if cerr != nil {
|
||||
plog.Errorf("couldn't resolve during SRV discovery (%v)", cerr)
|
||||
return nil, "", cerr
|
||||
}
|
||||
for _, s := range clusterStrs {
|
||||
plog.Noticef("got bootstrap from DNS for etcd-server at %s", s)
|
||||
}
|
||||
clusterStr := strings.Join(clusterStrs, ",")
|
||||
if strings.Contains(clusterStr, "https://") && cfg.PeerTLSInfo.CAFile == "" {
|
||||
cfg.PeerTLSInfo.ServerName = cfg.DNSCluster
|
||||
}
|
||||
urlsmap, err = types.NewURLsMap(clusterStr)
|
||||
// only etcd member must belong to the discovered cluster.
|
||||
// proxy does not need to belong to the discovered cluster.
|
||||
if which == "etcd" {
|
||||
if _, ok := urlsmap[cfg.Name]; !ok {
|
||||
return nil, "", fmt.Errorf("cannot find local etcd member %q in SRV records", cfg.Name)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// We're statically configured, and cluster has appropriately been set.
|
||||
urlsmap, err = types.NewURLsMap(cfg.InitialCluster)
|
||||
}
|
||||
return urlsmap, token, err
|
||||
}
|
||||
|
||||
func (cfg Config) InitialClusterFromName(name string) (ret string) {
|
||||
if len(cfg.APUrls) == 0 {
|
||||
return ""
|
||||
}
|
||||
n := name
|
||||
if name == "" {
|
||||
n = DefaultName
|
||||
}
|
||||
for i := range cfg.APUrls {
|
||||
ret = ret + "," + n + "=" + cfg.APUrls[i].String()
|
||||
}
|
||||
return ret[1:]
|
||||
}
|
||||
|
||||
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
|
||||
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||
|
||||
func (cfg Config) defaultPeerHost() bool {
|
||||
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
|
||||
}
|
||||
|
||||
func (cfg Config) defaultClientHost() bool {
|
||||
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
|
||||
}
|
||||
|
||||
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
|
||||
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
|
||||
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
|
||||
// then the advertise peer host would be updated with machine's default host,
|
||||
// while keeping the listen URL's port.
|
||||
// User can work around this by explicitly setting URL with 127.0.0.1.
|
||||
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
|
||||
// TODO: check whether fields are set instead of whether fields have default value
|
||||
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
|
||||
if defaultHostname == "" || defaultHostStatus != nil {
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
return "", defaultHostStatus
|
||||
}
|
||||
|
||||
used := false
|
||||
pip, pport := cfg.LPUrls[0].Hostname(), cfg.LPUrls[0].Port()
|
||||
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
|
||||
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
|
||||
used = true
|
||||
}
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
|
||||
cip, cport := cfg.LCUrls[0].Hostname(), cfg.LCUrls[0].Port()
|
||||
if cfg.defaultClientHost() && cip == "0.0.0.0" {
|
||||
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
|
||||
used = true
|
||||
}
|
||||
dhost := defaultHostname
|
||||
if !used {
|
||||
dhost = ""
|
||||
}
|
||||
return dhost, defaultHostStatus
|
||||
}
|
||||
|
||||
// checkBindURLs returns an error if any URL uses a domain name.
|
||||
// TODO: return error in 3.2.0
|
||||
func checkBindURLs(urls []url.URL) error {
|
||||
for _, url := range urls {
|
||||
if url.Scheme == "unix" || url.Scheme == "unixs" {
|
||||
continue
|
||||
}
|
||||
host, _, err := net.SplitHostPort(url.Host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if host == "localhost" {
|
||||
// special case for local address
|
||||
// TODO: support /etc/hosts ?
|
||||
continue
|
||||
}
|
||||
if net.ParseIP(host) == nil {
|
||||
return fmt.Errorf("expected IP in URL for binding (%s)", url.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
45
vendor/github.com/coreos/etcd/embed/doc.go
generated
vendored
Normal file
45
vendor/github.com/coreos/etcd/embed/doc.go
generated
vendored
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package embed provides bindings for embedding an etcd server in a program.
|
||||
|
||||
Launch an embedded etcd server using the configuration defaults:
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/embed"
|
||||
)
|
||||
|
||||
func main() {
|
||||
cfg := embed.NewConfig()
|
||||
cfg.Dir = "default.etcd"
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer e.Close()
|
||||
select {
|
||||
case <-e.Server.ReadyNotify():
|
||||
log.Printf("Server is ready!")
|
||||
case <-time.After(60 * time.Second):
|
||||
e.Server.Stop() // trigger a shutdown
|
||||
log.Printf("Server took too long to start!")
|
||||
}
|
||||
log.Fatal(<-e.Err())
|
||||
}
|
||||
*/
|
||||
package embed
|
||||
509
vendor/github.com/coreos/etcd/embed/etcd.go
generated
vendored
Normal file
509
vendor/github.com/coreos/etcd/embed/etcd.go
generated
vendored
Normal file
@@ -0,0 +1,509 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
defaultLog "log"
|
||||
"net"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/debugutil"
|
||||
runtimeutil "github.com/coreos/etcd/pkg/runtime"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
|
||||
"github.com/cockroachdb/cmux"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
|
||||
|
||||
const (
|
||||
// internal fd usage includes disk usage and transport usage.
|
||||
// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
|
||||
// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
|
||||
// read all logs after some snapshot index, which locates at the end of
|
||||
// the second last and the head of the last. For purging, it needs to read
|
||||
// directory, so it needs 1. For fd monitor, it needs 1.
|
||||
// For transport, rafthttp builds two long-polling connections and at most
|
||||
// four temporary connections with each member. There are at most 9 members
|
||||
// in a cluster, so it should reserve 96.
|
||||
// For the safety, we set the total reserved number to 150.
|
||||
reservedInternalFDNum = 150
|
||||
)
|
||||
|
||||
// Etcd contains a running etcd server and its listeners.
|
||||
type Etcd struct {
|
||||
Peers []*peerListener
|
||||
Clients []net.Listener
|
||||
// a map of contexts for the servers that serves client requests.
|
||||
sctxs map[string]*serveCtx
|
||||
|
||||
Server *etcdserver.EtcdServer
|
||||
|
||||
cfg Config
|
||||
stopc chan struct{}
|
||||
errc chan error
|
||||
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
type peerListener struct {
|
||||
net.Listener
|
||||
serve func() error
|
||||
close func(context.Context) error
|
||||
}
|
||||
|
||||
// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
|
||||
// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
|
||||
// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
|
||||
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
if err = inCfg.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serving := false
|
||||
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
|
||||
cfg := &e.cfg
|
||||
defer func() {
|
||||
if e == nil || err == nil {
|
||||
return
|
||||
}
|
||||
if !serving {
|
||||
// errored before starting gRPC server for serveCtx.serversC
|
||||
for _, sctx := range e.sctxs {
|
||||
close(sctx.serversC)
|
||||
}
|
||||
}
|
||||
e.Close()
|
||||
e = nil
|
||||
}()
|
||||
|
||||
if e.Peers, err = startPeerListeners(cfg); err != nil {
|
||||
return e, err
|
||||
}
|
||||
if e.sctxs, err = startClientListeners(cfg); err != nil {
|
||||
return e, err
|
||||
}
|
||||
for _, sctx := range e.sctxs {
|
||||
e.Clients = append(e.Clients, sctx.l)
|
||||
}
|
||||
|
||||
var (
|
||||
urlsmap types.URLsMap
|
||||
token string
|
||||
)
|
||||
|
||||
if !isMemberInitialized(cfg) {
|
||||
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
|
||||
if err != nil {
|
||||
return e, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
srvcfg := &etcdserver.ServerConfig{
|
||||
Name: cfg.Name,
|
||||
ClientURLs: cfg.ACUrls,
|
||||
PeerURLs: cfg.APUrls,
|
||||
DataDir: cfg.Dir,
|
||||
DedicatedWALDir: cfg.WalDir,
|
||||
SnapCount: cfg.SnapCount,
|
||||
MaxSnapFiles: cfg.MaxSnapFiles,
|
||||
MaxWALFiles: cfg.MaxWalFiles,
|
||||
InitialPeerURLsMap: urlsmap,
|
||||
InitialClusterToken: token,
|
||||
DiscoveryURL: cfg.Durl,
|
||||
DiscoveryProxy: cfg.Dproxy,
|
||||
NewCluster: cfg.IsNewCluster(),
|
||||
ForceNewCluster: cfg.ForceNewCluster,
|
||||
PeerTLSInfo: cfg.PeerTLSInfo,
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||
StrictReconfigCheck: cfg.StrictReconfigCheck,
|
||||
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
|
||||
AuthToken: cfg.AuthToken,
|
||||
Debug: cfg.Debug,
|
||||
}
|
||||
|
||||
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
|
||||
return e, err
|
||||
}
|
||||
|
||||
// buffer channel so goroutines on closed connections won't wait forever
|
||||
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
|
||||
|
||||
e.Server.Start()
|
||||
|
||||
if err = e.servePeers(); err != nil {
|
||||
return e, err
|
||||
}
|
||||
if err = e.serveClients(); err != nil {
|
||||
return e, err
|
||||
}
|
||||
|
||||
serving = true
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Config returns the current configuration.
|
||||
func (e *Etcd) Config() Config {
|
||||
return e.cfg
|
||||
}
|
||||
|
||||
// Close gracefully shuts down all servers/listeners.
|
||||
// Client requests will be terminated with request timeout.
|
||||
// After timeout, enforce remaning requests be closed immediately.
|
||||
func (e *Etcd) Close() {
|
||||
e.closeOnce.Do(func() { close(e.stopc) })
|
||||
|
||||
// close client requests with request timeout
|
||||
timeout := 2 * time.Second
|
||||
if e.Server != nil {
|
||||
timeout = e.Server.Cfg.ReqTimeout()
|
||||
}
|
||||
for _, sctx := range e.sctxs {
|
||||
for ss := range sctx.serversC {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
stopServers(ctx, ss)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
for _, sctx := range e.sctxs {
|
||||
sctx.cancel()
|
||||
}
|
||||
|
||||
for i := range e.Clients {
|
||||
if e.Clients[i] != nil {
|
||||
e.Clients[i].Close()
|
||||
}
|
||||
}
|
||||
|
||||
// close rafthttp transports
|
||||
if e.Server != nil {
|
||||
e.Server.Stop()
|
||||
}
|
||||
|
||||
// close all idle connections in peer handler (wait up to 1-second)
|
||||
for i := range e.Peers {
|
||||
if e.Peers[i] != nil && e.Peers[i].close != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
e.Peers[i].close(ctx)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stopServers(ctx context.Context, ss *servers) {
|
||||
shutdownNow := func() {
|
||||
// first, close the http.Server
|
||||
ss.http.Shutdown(ctx)
|
||||
// then close grpc.Server; cancels all active RPCs
|
||||
ss.grpc.Stop()
|
||||
}
|
||||
|
||||
// do not grpc.Server.GracefulStop with TLS enabled etcd server
|
||||
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
|
||||
// and https://github.com/coreos/etcd/issues/8916
|
||||
if ss.secure {
|
||||
shutdownNow()
|
||||
return
|
||||
}
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
// close listeners to stop accepting new connections,
|
||||
// will block on any existing transports
|
||||
ss.grpc.GracefulStop()
|
||||
}()
|
||||
|
||||
// wait until all pending RPCs are finished
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
// took too long, manually close open transports
|
||||
// e.g. watch streams
|
||||
shutdownNow()
|
||||
|
||||
// concurrent GracefulStop should be interrupted
|
||||
<-ch
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Etcd) Err() <-chan error { return e.errc }
|
||||
|
||||
func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
|
||||
if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() {
|
||||
phosts := make([]string, len(cfg.LPUrls))
|
||||
for i, u := range cfg.LPUrls {
|
||||
phosts[i] = u.Host
|
||||
}
|
||||
cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts)
|
||||
if err != nil {
|
||||
plog.Fatalf("could not get certs (%v)", err)
|
||||
}
|
||||
} else if cfg.PeerAutoTLS {
|
||||
plog.Warningf("ignoring peer auto TLS since certs given")
|
||||
}
|
||||
|
||||
if !cfg.PeerTLSInfo.Empty() {
|
||||
plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
|
||||
}
|
||||
|
||||
peers = make([]*peerListener, len(cfg.LPUrls))
|
||||
defer func() {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
for i := range peers {
|
||||
if peers[i] != nil && peers[i].close != nil {
|
||||
plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
peers[i].close(ctx)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i, u := range cfg.LPUrls {
|
||||
if u.Scheme == "http" {
|
||||
if !cfg.PeerTLSInfo.Empty() {
|
||||
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
||||
}
|
||||
if cfg.PeerTLSInfo.ClientCertAuth {
|
||||
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
||||
}
|
||||
}
|
||||
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
|
||||
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// once serve, overwrite with 'http.Server.Shutdown'
|
||||
peers[i].close = func(context.Context) error {
|
||||
return peers[i].Listener.Close()
|
||||
}
|
||||
plog.Info("listening for peers on ", u.String())
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// configure peer handlers after rafthttp.Transport started
|
||||
func (e *Etcd) servePeers() (err error) {
|
||||
ph := etcdhttp.NewPeerHandler(e.Server)
|
||||
var peerTLScfg *tls.Config
|
||||
if !e.cfg.PeerTLSInfo.Empty() {
|
||||
if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range e.Peers {
|
||||
gs := v3rpc.Server(e.Server, peerTLScfg)
|
||||
m := cmux.New(p.Listener)
|
||||
go gs.Serve(m.Match(cmux.HTTP2()))
|
||||
srv := &http.Server{
|
||||
Handler: grpcHandlerFunc(gs, ph),
|
||||
ReadTimeout: 5 * time.Minute,
|
||||
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
|
||||
}
|
||||
go srv.Serve(m.Match(cmux.Any()))
|
||||
p.serve = func() error { return m.Serve() }
|
||||
p.close = func(ctx context.Context) error {
|
||||
// gracefully shutdown http.Server
|
||||
// close open listeners, idle connections
|
||||
// until context cancel or time-out
|
||||
stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// start peer servers in a goroutine
|
||||
for _, pl := range e.Peers {
|
||||
go func(l *peerListener) {
|
||||
e.errHandler(l.serve())
|
||||
}(pl)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
|
||||
if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() {
|
||||
chosts := make([]string, len(cfg.LCUrls))
|
||||
for i, u := range cfg.LCUrls {
|
||||
chosts[i] = u.Host
|
||||
}
|
||||
cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts)
|
||||
if err != nil {
|
||||
plog.Fatalf("could not get certs (%v)", err)
|
||||
}
|
||||
} else if cfg.ClientAutoTLS {
|
||||
plog.Warningf("ignoring client auto TLS since certs given")
|
||||
}
|
||||
|
||||
if cfg.EnablePprof {
|
||||
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
|
||||
}
|
||||
|
||||
sctxs = make(map[string]*serveCtx)
|
||||
for _, u := range cfg.LCUrls {
|
||||
sctx := newServeCtx()
|
||||
|
||||
if u.Scheme == "http" || u.Scheme == "unix" {
|
||||
if !cfg.ClientTLSInfo.Empty() {
|
||||
plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
|
||||
}
|
||||
if cfg.ClientTLSInfo.ClientCertAuth {
|
||||
plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
|
||||
}
|
||||
}
|
||||
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
|
||||
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
|
||||
}
|
||||
|
||||
proto := "tcp"
|
||||
addr := u.Host
|
||||
if u.Scheme == "unix" || u.Scheme == "unixs" {
|
||||
proto = "unix"
|
||||
addr = u.Host + u.Path
|
||||
}
|
||||
|
||||
sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
|
||||
sctx.insecure = !sctx.secure
|
||||
if oldctx := sctxs[addr]; oldctx != nil {
|
||||
oldctx.secure = oldctx.secure || sctx.secure
|
||||
oldctx.insecure = oldctx.insecure || sctx.insecure
|
||||
continue
|
||||
}
|
||||
|
||||
if sctx.l, err = net.Listen(proto, addr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
|
||||
// hosts that disable ipv6. So, use the address given by the user.
|
||||
sctx.addr = addr
|
||||
|
||||
if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
|
||||
if fdLimit <= reservedInternalFDNum {
|
||||
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
|
||||
}
|
||||
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
|
||||
}
|
||||
|
||||
if proto == "tcp" {
|
||||
if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
plog.Info("listening for client requests on ", u.Host)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
sctx.l.Close()
|
||||
plog.Info("stopping listening for client requests on ", u.Host)
|
||||
}
|
||||
}()
|
||||
for k := range cfg.UserHandlers {
|
||||
sctx.userHandlers[k] = cfg.UserHandlers[k]
|
||||
}
|
||||
sctx.serviceRegister = cfg.ServiceRegister
|
||||
if cfg.EnablePprof || cfg.Debug {
|
||||
sctx.registerPprof()
|
||||
}
|
||||
if cfg.Debug {
|
||||
sctx.registerTrace()
|
||||
}
|
||||
sctxs[addr] = sctx
|
||||
}
|
||||
return sctxs, nil
|
||||
}
|
||||
|
||||
func (e *Etcd) serveClients() (err error) {
|
||||
var ctlscfg *tls.Config
|
||||
if !e.cfg.ClientTLSInfo.Empty() {
|
||||
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
|
||||
if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if e.cfg.CorsInfo.String() != "" {
|
||||
plog.Infof("cors = %s", e.cfg.CorsInfo)
|
||||
}
|
||||
|
||||
// Start a client server goroutine for each listen address
|
||||
var h http.Handler
|
||||
if e.Config().EnableV2 {
|
||||
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
|
||||
} else {
|
||||
mux := http.NewServeMux()
|
||||
etcdhttp.HandleBasic(mux, e.Server)
|
||||
h = mux
|
||||
}
|
||||
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
|
||||
|
||||
gopts := []grpc.ServerOption{}
|
||||
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
|
||||
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: e.cfg.GRPCKeepAliveMinTime,
|
||||
PermitWithoutStream: false,
|
||||
}))
|
||||
}
|
||||
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
|
||||
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
|
||||
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
Time: e.cfg.GRPCKeepAliveInterval,
|
||||
Timeout: e.cfg.GRPCKeepAliveTimeout,
|
||||
}))
|
||||
}
|
||||
|
||||
// start client servers in a goroutine
|
||||
for _, sctx := range e.sctxs {
|
||||
go func(s *serveCtx) {
|
||||
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler, gopts...))
|
||||
}(sctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Etcd) errHandler(err error) {
|
||||
select {
|
||||
case <-e.stopc:
|
||||
return
|
||||
default:
|
||||
}
|
||||
select {
|
||||
case <-e.stopc:
|
||||
case e.errc <- err:
|
||||
}
|
||||
}
|
||||
244
vendor/github.com/coreos/etcd/embed/serve.go
generated
vendored
Normal file
244
vendor/github.com/coreos/etcd/embed/serve.go
generated
vendored
Normal file
@@ -0,0 +1,244 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
defaultLog "log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3client"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3election"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
|
||||
v3electiongw "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb/gw"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3lock"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
|
||||
v3lockgw "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb/gw"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
etcdservergw "github.com/coreos/etcd/etcdserver/etcdserverpb/gw"
|
||||
"github.com/coreos/etcd/pkg/debugutil"
|
||||
|
||||
"github.com/cockroachdb/cmux"
|
||||
gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
type serveCtx struct {
|
||||
l net.Listener
|
||||
addr string
|
||||
secure bool
|
||||
insecure bool
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
userHandlers map[string]http.Handler
|
||||
serviceRegister func(*grpc.Server)
|
||||
serversC chan *servers
|
||||
}
|
||||
|
||||
type servers struct {
|
||||
secure bool
|
||||
grpc *grpc.Server
|
||||
http *http.Server
|
||||
}
|
||||
|
||||
func newServeCtx() *serveCtx {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &serveCtx{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
userHandlers: make(map[string]http.Handler),
|
||||
serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
|
||||
}
|
||||
}
|
||||
|
||||
// serve accepts incoming connections on the listener l,
|
||||
// creating a new service goroutine for each. The service goroutines
|
||||
// read requests and then call handler to reply to them.
|
||||
func (sctx *serveCtx) serve(
|
||||
s *etcdserver.EtcdServer,
|
||||
tlscfg *tls.Config,
|
||||
handler http.Handler,
|
||||
errHandler func(error),
|
||||
gopts ...grpc.ServerOption) error {
|
||||
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
|
||||
<-s.ReadyNotify()
|
||||
plog.Info("ready to serve client requests")
|
||||
|
||||
m := cmux.New(sctx.l)
|
||||
v3c := v3client.New(s)
|
||||
servElection := v3election.NewElectionServer(v3c)
|
||||
servLock := v3lock.NewLockServer(v3c)
|
||||
|
||||
if sctx.insecure {
|
||||
gs := v3rpc.Server(s, nil, gopts...)
|
||||
v3electionpb.RegisterElectionServer(gs, servElection)
|
||||
v3lockpb.RegisterLockServer(gs, servLock)
|
||||
if sctx.serviceRegister != nil {
|
||||
sctx.serviceRegister(gs)
|
||||
}
|
||||
grpcl := m.Match(cmux.HTTP2())
|
||||
go func() { errHandler(gs.Serve(grpcl)) }()
|
||||
|
||||
opts := []grpc.DialOption{grpc.WithInsecure()}
|
||||
gwmux, err := sctx.registerGateway(opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
httpmux := sctx.createMux(gwmux, handler)
|
||||
|
||||
srvhttp := &http.Server{
|
||||
Handler: httpmux,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
httpl := m.Match(cmux.HTTP1())
|
||||
go func() { errHandler(srvhttp.Serve(httpl)) }()
|
||||
|
||||
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
|
||||
plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
|
||||
}
|
||||
|
||||
if sctx.secure {
|
||||
gs := v3rpc.Server(s, tlscfg, gopts...)
|
||||
v3electionpb.RegisterElectionServer(gs, servElection)
|
||||
v3lockpb.RegisterLockServer(gs, servLock)
|
||||
if sctx.serviceRegister != nil {
|
||||
sctx.serviceRegister(gs)
|
||||
}
|
||||
handler = grpcHandlerFunc(gs, handler)
|
||||
|
||||
dtls := tlscfg.Clone()
|
||||
// trust local server
|
||||
dtls.InsecureSkipVerify = true
|
||||
creds := credentials.NewTLS(dtls)
|
||||
opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
|
||||
gwmux, err := sctx.registerGateway(opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tlsl := tls.NewListener(m.Match(cmux.Any()), tlscfg)
|
||||
// TODO: add debug flag; enable logging when debug flag is set
|
||||
httpmux := sctx.createMux(gwmux, handler)
|
||||
|
||||
srv := &http.Server{
|
||||
Handler: httpmux,
|
||||
TLSConfig: tlscfg,
|
||||
ErrorLog: logger, // do not log user error
|
||||
}
|
||||
go func() { errHandler(srv.Serve(tlsl)) }()
|
||||
|
||||
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
|
||||
plog.Infof("serving client requests on %s", sctx.l.Addr().String())
|
||||
}
|
||||
|
||||
close(sctx.serversC)
|
||||
return m.Serve()
|
||||
}
|
||||
|
||||
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
|
||||
// connections or otherHandler otherwise. Copied from cockroachdb.
|
||||
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
|
||||
if otherHandler == nil {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
grpcServer.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
grpcServer.ServeHTTP(w, r)
|
||||
} else {
|
||||
otherHandler.ServeHTTP(w, r)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
|
||||
|
||||
func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
|
||||
ctx := sctx.ctx
|
||||
conn, err := grpc.DialContext(ctx, sctx.addr, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gwmux := gw.NewServeMux()
|
||||
|
||||
handlers := []registerHandlerFunc{
|
||||
etcdservergw.RegisterKVHandler,
|
||||
etcdservergw.RegisterWatchHandler,
|
||||
etcdservergw.RegisterLeaseHandler,
|
||||
etcdservergw.RegisterClusterHandler,
|
||||
etcdservergw.RegisterMaintenanceHandler,
|
||||
etcdservergw.RegisterAuthHandler,
|
||||
v3lockgw.RegisterLockHandler,
|
||||
v3electiongw.RegisterElectionHandler,
|
||||
}
|
||||
for _, h := range handlers {
|
||||
if err := h(ctx, gwmux, conn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if cerr := conn.Close(); cerr != nil {
|
||||
plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr)
|
||||
}
|
||||
}()
|
||||
|
||||
return gwmux, nil
|
||||
}
|
||||
|
||||
func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
|
||||
httpmux := http.NewServeMux()
|
||||
for path, h := range sctx.userHandlers {
|
||||
httpmux.Handle(path, h)
|
||||
}
|
||||
|
||||
httpmux.Handle("/v3alpha/", gwmux)
|
||||
if handler != nil {
|
||||
httpmux.Handle("/", handler)
|
||||
}
|
||||
return httpmux
|
||||
}
|
||||
|
||||
func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
|
||||
if sctx.userHandlers[s] != nil {
|
||||
plog.Warningf("path %s already registered by user handler", s)
|
||||
return
|
||||
}
|
||||
sctx.userHandlers[s] = h
|
||||
}
|
||||
|
||||
func (sctx *serveCtx) registerPprof() {
|
||||
for p, h := range debugutil.PProfHandlers() {
|
||||
sctx.registerUserHandler(p, h)
|
||||
}
|
||||
}
|
||||
|
||||
func (sctx *serveCtx) registerTrace() {
|
||||
reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
|
||||
sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
|
||||
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
|
||||
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
|
||||
}
|
||||
30
vendor/github.com/coreos/etcd/embed/util.go
generated
vendored
Normal file
30
vendor/github.com/coreos/etcd/embed/util.go
generated
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package embed
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/coreos/etcd/wal"
|
||||
)
|
||||
|
||||
func isMemberInitialized(cfg *Config) bool {
|
||||
waldir := cfg.WalDir
|
||||
if waldir == "" {
|
||||
waldir = filepath.Join(cfg.Dir, "member", "wal")
|
||||
}
|
||||
|
||||
return wal.Exist(waldir)
|
||||
}
|
||||
Reference in New Issue
Block a user