Skip to content

Commit

Permalink
Merge pull request #104134 from ihgann/topic/ganni/optimize-kubeadm-e…
Browse files Browse the repository at this point in the history
…tcd-member-add-2

kubeadm: reduce the backoff time of AddMember for etcd
  • Loading branch information
k8s-ci-robot authored Aug 5, 2021
2 parents 05ee896 + c8431f4 commit de4e500
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 34 deletions.
35 changes: 6 additions & 29 deletions cmd/kubeadm/app/phases/etcd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,42 +144,19 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest

etcdPeerAddress := etcdutil.GetPeerURL(endpoint)

klog.V(1).Infoln("[etcd] Getting the list of existing members")
initialCluster, err := etcdClient.ListMembers()
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
var cluster []etcdutil.Member
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}

// only add the new member if it doesn't already exists
var exists bool
klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
for i := range initialCluster {
if initialCluster[i].PeerURL == etcdPeerAddress {
exists = true
if len(initialCluster[i].Name) == 0 {
klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName)
initialCluster[i].Name = nodeName
}
break
}
}

if exists {
klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint)
} else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}

fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", cluster)

fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)

if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, initialCluster, isDryRun); err != nil {
if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil {
return err
}

Expand Down
31 changes: 26 additions & 5 deletions cmd/kubeadm/app/util/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"k8s.io/klog/v2"

"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
Expand Down Expand Up @@ -339,7 +341,9 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
return ret, nil
}

// AddMember notifies an existing etcd cluster that a new member is joining
// AddMember notifies an existing etcd cluster that a new member is joining, and
// return the updated list of members. If the member has already been added to the
// cluster, this will return the existing list of etcd members.
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list
// of endpoints for this client. Parsing as a first operation to make sure that
Expand All @@ -350,8 +354,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
}

// Adds a new member to the cluster
var lastError error
var resp *clientv3.MemberAddResponse
var (
lastError error
respMembers []*etcdserverpb.Member
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
Expand All @@ -368,11 +374,26 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
var resp *clientv3.MemberAddResponse
resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
cancel()
if err == nil {
respMembers = resp.Members
return true, nil
}

// If the error indicates that the peer already exists, exit early. In this situation, resp is nil, so
// call out to MemberList to fetch all the members before returning.
if errors.Is(err, rpctypes.ErrPeerURLExist) {
klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members")
var listResp *clientv3.MemberListResponse
listResp, err = cli.MemberList(ctx)
if err == nil {
respMembers = listResp.Members
return true, nil
}
}

klog.V(5).Infof("Failed to add etcd member: %v", err)
lastError = err
return false, nil
Expand All @@ -383,7 +404,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {

// Returns the updated list of etcd members
ret := []Member{}
for _, m := range resp.Members {
for _, m := range respMembers {
// If the peer address matches, this is the member we are adding.
// Use the name we passed to the function.
if peerAddrs == m.PeerURLs[0] {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/govmomi v0.20.3
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.opentelemetry.io/otel/sdk v0.20.0
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ github.com/xlab/treeprint
# go.etcd.io/bbolt v1.3.6 => go.etcd.io/bbolt v1.3.6
go.etcd.io/bbolt
# go.etcd.io/etcd/api/v3 v3.5.0 => go.etcd.io/etcd/api/v3 v3.5.0
## explicit
go.etcd.io/etcd/api/v3/authpb
go.etcd.io/etcd/api/v3/etcdserverpb
go.etcd.io/etcd/api/v3/etcdserverpb/gw
Expand Down

0 comments on commit de4e500

Please sign in to comment.