Skip to content

Commit

Permalink
YT-19048: Add test for location lost on master
Browse files Browse the repository at this point in the history
  • Loading branch information
don-dron committed May 5, 2023
1 parent 7d3a84b commit b20dc76
Showing 1 changed file with 103 additions and 1 deletion.
104 changes: 103 additions & 1 deletion yt/yt/tests/integration/node/test_hot_swap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from yt_env_setup import YTEnvSetup
from yt_env_setup import YTEnvSetup, Restarter, NODES_SERVICE

from yt_commands import (
authors, wait, create_access_control_object_namespace,
Expand All @@ -10,8 +10,19 @@
write_file, disable_chunk_locations,
resurrect_chunk_locations)

import time

from yt.common import YtError

from os import listdir
from os.path import isfile, join

import io
try:
import zstd
except ImportError:
import zstandard as zstd

##################################################################

log = logging.getLogger(__name__)
Expand All @@ -23,6 +34,12 @@ class TestHotSwap(YTEnvSetup):
NUM_SCHEDULERS = 1
STORE_LOCATION_COUNT = 2

DELTA_MASTER_CONFIG = {
"logging": {
"abort_on_alert": False,
},
}

DELTA_NODE_CONFIG = {
"tags": ["config_tag1", "config_tag2"],
"exec_agent": {
Expand Down Expand Up @@ -269,3 +286,88 @@ def chunk_count():
wait(lambda: get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid)))

wait(lambda: chunk_id not in find_location_chunks(location_uuid) and chunk_id in find_location_chunks(other_location_uuid))

@authors("don-dron")
def test_lost_location(self):
update_nodes_dynamic_config({"data_node": {"abort_on_location_disabled": False, "publish_disabled_locations": True}})
nodes = ls("//sys/cluster_nodes")
assert len(nodes) == 2

create("file", "//tmp/f")

def can_write(key="a"):
try:
write_file("//tmp/f", str.encode(key))
return True
except YtError:
return False

for node in nodes:
wait(lambda: exists("//sys/cluster_nodes/{0}/orchid/reboot_manager".format(node)))
wait(lambda: not get("//sys/cluster_nodes/{0}/orchid/reboot_manager/need_reboot".format(node)))
wait(lambda: get("//sys/cluster_nodes/{0}/@resource_limits/user_slots".format(node)) > 0)
wait(lambda: can_write())

for node in ls("//sys/cluster_nodes", attributes=["chunk_locations"]):
def chunk_count():
count = 0
for location_uuid, _ in get("//sys/cluster_nodes/{}/@chunk_locations".format(node)).items():
wait(lambda: can_write("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)))
count = count + get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid))
return count
wait(lambda: chunk_count() != 0)

node = ls("//sys/cluster_nodes", attributes=["chunk_locations"])[0]
locations = get("//sys/cluster_nodes/{}/@chunk_locations".format(node))
location_uuids = list(locations.keys())

for uuid in location_uuids:
wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(uuid)) > 0)

location_uuid = location_uuids[0]

wait(lambda: len(disable_chunk_locations(node, [location_uuid])) > 0)
wait(lambda: not get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid)))
wait(lambda: get("//sys/chunk_locations/{}/@statistics/session_count".format(location_uuid)) == 0)
wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)) == 0)

# Disable disabled location publishing
update_nodes_dynamic_config({"data_node": {"abort_on_location_disabled": False, "publish_disabled_locations": False}})

with Restarter(self.Env, NODES_SERVICE):
time.sleep(2)
pass

wait(lambda: not get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid)))
wait(lambda: get("//sys/chunk_locations/{}/@statistics/session_count".format(location_uuid)) == 0)
wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)) == 0)
wait(lambda: len(resurrect_chunk_locations(node, [location_uuid])) > 0)
wait(lambda: get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid)))

wait(lambda: can_write())
time.sleep(2)

def contains_alert(logs_path):
master_files = [join(logs_path, f) for f in listdir(logs_path) if "master" in f and ".log.zst" in f and isfile(join(logs_path, f))]

for file_path in master_files:
with open(file_path, "rb") as log_file:
decompressor = zstd.ZstdDecompressor()
binary_reader = decompressor.stream_reader(log_file, read_size=8192)
text_stream = io.TextIOWrapper(binary_reader, encoding='utf-8')
for line in text_stream:
if "heartbeat with dangling location" in line:
return True

return False

def check_alert():
res = contains_alert(self.path_to_run + "/logs")

if res:
return res
else:
time.sleep(1)
return res

wait(lambda: check_alert())

0 comments on commit b20dc76

Please sign in to comment.