Skip to content

Commit

Permalink
Fixed etcd join.
Browse files Browse the repository at this point in the history
  • Loading branch information
art2ip committed Jan 9, 2024
1 parent 7b97b23 commit 16098b1
Showing 1 changed file with 71 additions and 51 deletions.
122 changes: 71 additions & 51 deletions cmd/etcdsvr/etcdsvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import logging
import os
import random
import shlex
import signal
import socket
import subprocess
Expand Down Expand Up @@ -62,13 +63,27 @@ def ip_in_use(ip, port):
s.close()
return True

except socket.error as e:
except socket.error:
pass
except Exception as e:
except Exception:
pass

return False


def run_cmd(cmd, get_result=False):
result = ""
try:
out = None
if get_result:
out = subprocess.PIPE
re = subprocess.run(shlex.split(cmd), stdout=out, stderr=out,
universal_newlines=True, check=True)
result = str(re.stdout)

except subprocess.CalledProcessError:
pass
return result

class Config():
# Init default.
def __init__(self):
Expand All @@ -95,8 +110,8 @@ def get_members(self):
cmd_list = "%s member list" % (etcd_cmd)
members = ""
try:
members = subprocess.check_output(cmd_list, shell=False)
except subprocess.CalledProcessError as e:
members = run_cmd(cmd_list, get_result=True)
except subprocess.CalledProcessError:
pass
return members

Expand All @@ -105,25 +120,23 @@ def display_status(self):

os.environ["ETCDCTL_API"] = "3"
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd)
cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)

out = etcd_cmd
out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=False)
print(out)
out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=False)
print(out)
out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=False)
print(out)
cmd_list = "%s member list" % (etcd_cmd)
cmd_status = "%s endpoint status" % (etcd_cmd)
cmd_health = "%s endpoint health" % (etcd_cmd)

print(etcd_cmd + "\n\n===== member list\n")
run_cmd(cmd_list)

print("\n===== endpoint status\n")
run_cmd(cmd_status)

print("\n===== endpoint health\n")
run_cmd(cmd_health)

# Join an existing cluster.
def join_cluster(self):

etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % (
etcd_cmd, self.etcd_name
)

cmd_add = "%s member add %s --peer-urls=%s" % (
etcd_cmd, self.etcd_name, self.peer_url
Expand All @@ -133,41 +146,48 @@ def join_cluster(self):

ok = True
err = None
resp = ">> Members:\n"
try:
os.environ["ETCDCTL_API"] = "3"
resp += self.get_members()
text = self.get_members()

hexid = ""

resp = ">> Members:\n" + text
print(resp)

# Remove the current entry if any
resp += "\n>> Select:\n%s\n\n" % (cmd_select)
hexid = subprocess.check_output(cmd_select, shell=False)
lines = text.split("\n")
for li in lines:
tokens = li.split(", ")
if len(tokens) > 3 and self.etcd_name == tokens[2]:
hexid = tokens[0]
break

if len(hexid) > 0:
cmd_remove = "%s member remove %s" % (etcd_cmd, hexid)
resp += "\n>> Remove:\n%s\n\n" % (cmd_remove)

resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=False)
cmd_remove = "%s member remove %s\n\n" % (etcd_cmd, hexid)
print("\n>> Remove:\n%s" % (cmd_remove))
resp += cmd_remove

run_cmd(cmd_remove)
sleep(5)

# Add a new entry
resp += "\n>> Add:\n%s\n\n" % (cmd_add)

resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=False)

resp += "\n>> Members:\n"
resp += self.get_members()
resp += "\n"

resp += cmd_rm
resp += "\n"
msg = "\n>> Add:\n%s\n\n" % (cmd_add)
print(msg)
resp += msg

run_cmd(cmd_add)
msg = "\n>> Members:\n" + self.get_members()
print(msg)
resp += msg

msg = "\n" + cmd_rm + "\n"
print(msg)
resp += msg

except subprocess.CalledProcessError as e:
err = e.output
ok = False

print(resp)
with open("join.log", "w+") as f:
f.write(resp)

Expand All @@ -189,7 +209,6 @@ def add_json_cfg(self):
h["advertise-client-urls"] = client_url
h["initial-advertise-peer-urls"] = self.peer_url

dir = self.etcd_name + ".etcd"
if self.is_existing_cluster:
# Join an existing cluster
h["initial-cluster-state"] = "existing"
Expand Down Expand Up @@ -339,23 +358,22 @@ def sig_handler(self, sig, frame):
def is_endpoint_healthy(self, wait_time):
os.environ["ETCDCTL_API"] = "3"
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.local_endpoint)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)
cmd_health = "%s endpoint health" % (etcd_cmd)
result = ""

now = int(time())
for i in range(50):
sleep(2)
for i in range(10):
sleep(5)
t = int(time()) - now
if t > wait_time:
break

if t > 60:
msg = "unhealthy_%s" % (self.etcd_name)
if t > 50:
msg = "unhealthy_%s retry ..." % (self.etcd_name)
self.logger.error("[MANAGER] %s" % (msg))

result = subprocess.check_output(cmd_health, shell=False)
health_check_bytes = str.encode("is healthy")
if health_check_bytes in result:
result = run_cmd(cmd_health, get_result=True)
if "is health" in result:
return True

self.logger.info("[MANAGER] %s" % (result))
Expand Down Expand Up @@ -418,8 +436,8 @@ def watch_and_recycle(self, cfg):
print(" ")
self.logger.info("[MANAGER] Started etcd process %d" % (self.pid))

wait_time = 85 + random.randint(0,10)
while False:
wait_time = 60 + random.randint(0,10)
while False: #not self.is_endpoint_healthy(wait_time):

if restartCount > 0:
self.shutdown_server()
Expand All @@ -436,6 +454,8 @@ def watch_and_recycle(self, cfg):
self.shutdown(-1) # exit

print("Starting etcd process %d succeeded." % (self.pid))
if os.path.exists("join.log"):
os.remove("join.log")
self.server.wait()

# etcd server has exited.
Expand Down Expand Up @@ -469,7 +489,7 @@ def watch_and_recycle(self, cfg):

with open("./etcdsvr.pid", "w") as f:
f.write("%d\n" % (os.getpid()))

cfg = Config()
err = cfg.parse_cfg(False)
if err:
Expand Down

0 comments on commit 16098b1

Please sign in to comment.