Merge pull request #104134 from ihgann/topic/ganni/optimize-kubeadm-etcd-member-add-2

kubeadm: reduce the backoff time of AddMember for etcd
This commit is contained in:
Kubernetes Prow Robot 2021-08-05 16:37:03 -07:00 committed by GitHub
commit de4e500673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 34 deletions

View File

@ -144,42 +144,19 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
etcdPeerAddress := etcdutil.GetPeerURL(endpoint)
klog.V(1).Infoln("[etcd] Getting the list of existing members")
initialCluster, err := etcdClient.ListMembers()
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
var cluster []etcdutil.Member
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}
// only add the new member if it doesn't already exists
var exists bool
klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
for i := range initialCluster {
if initialCluster[i].PeerURL == etcdPeerAddress {
exists = true
if len(initialCluster[i].Name) == 0 {
klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName)
initialCluster[i].Name = nodeName
}
break
}
}
if exists {
klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint)
} else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", cluster)
fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)
if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, initialCluster, isDryRun); err != nil {
if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil {
return err
}

View File

@ -36,6 +36,8 @@ import (
"k8s.io/klog/v2"
"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
@ -339,7 +341,9 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
return ret, nil
}
// AddMember notifies an existing etcd cluster that a new member is joining
// AddMember notifies an existing etcd cluster that a new member is joining, and
// return the updated list of members. If the member has already been added to the
// cluster, this will return the existing list of etcd members.
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list
// of endpoints for this client. Parsing as a first operation to make sure that
@ -350,8 +354,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
}
// Adds a new member to the cluster
var lastError error
var resp *clientv3.MemberAddResponse
var (
lastError error
respMembers []*etcdserverpb.Member
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
@ -368,11 +374,26 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
var resp *clientv3.MemberAddResponse
resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
cancel()
if err == nil {
respMembers = resp.Members
return true, nil
}
// If the error indicates that the peer already exists, exit early. In this situation, resp is nil, so
// call out to MemberList to fetch all the members before returning.
if errors.Is(err, rpctypes.ErrPeerURLExist) {
klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members")
var listResp *clientv3.MemberListResponse
listResp, err = cli.MemberList(ctx)
if err == nil {
respMembers = listResp.Members
return true, nil
}
}
klog.V(5).Infof("Failed to add etcd member: %v", err)
lastError = err
return false, nil
@ -383,7 +404,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Returns the updated list of etcd members
ret := []Member{}
for _, m := range resp.Members {
for _, m := range respMembers {
// If the peer address matches, this is the member we are adding.
// Use the name we passed to the function.
if peerAddrs == m.PeerURLs[0] {

1
go.mod
View File

@ -80,6 +80,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/govmomi v0.20.3
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.opentelemetry.io/otel/sdk v0.20.0

1
vendor/modules.txt vendored
View File

@ -772,6 +772,7 @@ github.com/xlab/treeprint
# go.etcd.io/bbolt v1.3.6 => go.etcd.io/bbolt v1.3.6
go.etcd.io/bbolt
# go.etcd.io/etcd/api/v3 v3.5.0 => go.etcd.io/etcd/api/v3 v3.5.0
## explicit
go.etcd.io/etcd/api/v3/authpb
go.etcd.io/etcd/api/v3/etcdserverpb
go.etcd.io/etcd/api/v3/etcdserverpb/gw