kubeadm: Reduce the backoff time of AddMember for etcd
This change optimizes the kubeadm/etcd `AddMember` client-side function by stopping early in the backoff loop when a peer conflict is found (indicating the member has already been added to the etcd cluster). In this situation, the function will stop early and relay a call to `ListMembers` to fetch the current list of members to return. With this optimization, front-loading a `ListMembers` call is no longer necessary, as this functionally returns the equivalent response. This helps reduce the amount of time taken in situational cases where an initial client request to add a member is accepted by the server, but fails client-side. This situation is possible situationally, such as if network latency causes the request to timeout after it was sent and accepted by the cluster. In this situation, the following loop would occur and fail with an `ErrPeerURLExist` response, and would be stuck until the backoff timeout was met (roughly ~2min30sec currently). Testing Done: * Manual testing with an etcd cluster. Initial "AddMember` call was successful, and the etcd manifest file was identical to prior version of these files. Subsequent calls to add the same member succeeded immediately (retaining idempotency), and the resulting manifest file remains identical to previous version as well. The difference, this time, is the call finished ~2min25sec faster in an identical test in the environment tested with.
This commit is contained in:
parent
db18331627
commit
c8431f42d9
@ -144,42 +144,19 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
|
||||
|
||||
etcdPeerAddress := etcdutil.GetPeerURL(endpoint)
|
||||
|
||||
klog.V(1).Infoln("[etcd] Getting the list of existing members")
|
||||
initialCluster, err := etcdClient.ListMembers()
|
||||
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
|
||||
var cluster []etcdutil.Member
|
||||
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// only add the new member if it doesn't already exists
|
||||
var exists bool
|
||||
klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
|
||||
for i := range initialCluster {
|
||||
if initialCluster[i].PeerURL == etcdPeerAddress {
|
||||
exists = true
|
||||
if len(initialCluster[i].Name) == 0 {
|
||||
klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName)
|
||||
initialCluster[i].Name = nodeName
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if exists {
|
||||
klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint)
|
||||
} else {
|
||||
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
|
||||
initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
|
||||
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
|
||||
}
|
||||
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
|
||||
klog.V(1).Infof("Updated etcd member list: %v", cluster)
|
||||
|
||||
fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)
|
||||
|
||||
if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, initialCluster, isDryRun); err != nil {
|
||||
if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,8 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"google.golang.org/grpc"
|
||||
@ -339,7 +341,9 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// AddMember notifies an existing etcd cluster that a new member is joining
|
||||
// AddMember notifies an existing etcd cluster that a new member is joining, and
|
||||
// return the updated list of members. If the member has already been added to the
|
||||
// cluster, this will return the existing list of etcd members.
|
||||
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
|
||||
// Parse the peer address, required to add the client URL later to the list
|
||||
// of endpoints for this client. Parsing as a first operation to make sure that
|
||||
@ -350,8 +354,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
|
||||
}
|
||||
|
||||
// Adds a new member to the cluster
|
||||
var lastError error
|
||||
var resp *clientv3.MemberAddResponse
|
||||
var (
|
||||
lastError error
|
||||
respMembers []*etcdserverpb.Member
|
||||
)
|
||||
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: c.Endpoints,
|
||||
@ -368,11 +374,26 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
|
||||
defer cli.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
|
||||
defer cancel()
|
||||
var resp *clientv3.MemberAddResponse
|
||||
resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
|
||||
cancel()
|
||||
if err == nil {
|
||||
respMembers = resp.Members
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// If the error indicates that the peer already exists, exit early. In this situation, resp is nil, so
|
||||
// call out to MemberList to fetch all the members before returning.
|
||||
if errors.Is(err, rpctypes.ErrPeerURLExist) {
|
||||
klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members")
|
||||
var listResp *clientv3.MemberListResponse
|
||||
listResp, err = cli.MemberList(ctx)
|
||||
if err == nil {
|
||||
respMembers = listResp.Members
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(5).Infof("Failed to add etcd member: %v", err)
|
||||
lastError = err
|
||||
return false, nil
|
||||
@ -383,7 +404,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
|
||||
|
||||
// Returns the updated list of etcd members
|
||||
ret := []Member{}
|
||||
for _, m := range resp.Members {
|
||||
for _, m := range respMembers {
|
||||
// If the peer address matches, this is the member we are adding.
|
||||
// Use the name we passed to the function.
|
||||
if peerAddrs == m.PeerURLs[0] {
|
||||
|
1
go.mod
1
go.mod
@ -81,6 +81,7 @@ require (
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/vishvananda/netlink v1.1.0
|
||||
github.com/vmware/govmomi v0.20.3
|
||||
go.etcd.io/etcd/api/v3 v3.5.0
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.0
|
||||
go.etcd.io/etcd/client/v3 v3.5.0
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
|
||||
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -773,6 +773,7 @@ github.com/xlab/treeprint
|
||||
# go.etcd.io/bbolt v1.3.6 => go.etcd.io/bbolt v1.3.6
|
||||
go.etcd.io/bbolt
|
||||
# go.etcd.io/etcd/api/v3 v3.5.0 => go.etcd.io/etcd/api/v3 v3.5.0
|
||||
## explicit
|
||||
go.etcd.io/etcd/api/v3/authpb
|
||||
go.etcd.io/etcd/api/v3/etcdserverpb
|
||||
go.etcd.io/etcd/api/v3/etcdserverpb/gw
|
||||
|
Loading…
Reference in New Issue
Block a user