From b20dc76c418ca4c52800010c2c4c29f87a060026 Mon Sep 17 00:00:00 2001 From: don-dron Date: Fri, 5 May 2023 14:04:34 +0300 Subject: [PATCH] YT-19048: Add test for location lost on master --- yt/yt/tests/integration/node/test_hot_swap.py | 104 +++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/yt/yt/tests/integration/node/test_hot_swap.py b/yt/yt/tests/integration/node/test_hot_swap.py index a8346cab561c..dc54078e4db8 100644 --- a/yt/yt/tests/integration/node/test_hot_swap.py +++ b/yt/yt/tests/integration/node/test_hot_swap.py @@ -1,6 +1,6 @@ import logging -from yt_env_setup import YTEnvSetup +from yt_env_setup import YTEnvSetup, Restarter, NODES_SERVICE from yt_commands import ( authors, wait, create_access_control_object_namespace, @@ -10,8 +10,19 @@ write_file, disable_chunk_locations, resurrect_chunk_locations) +import time + from yt.common import YtError +from os import listdir +from os.path import isfile, join + +import io +try: + import zstd +except ImportError: + import zstandard as zstd + ################################################################## log = logging.getLogger(__name__) @@ -23,6 +34,12 @@ class TestHotSwap(YTEnvSetup): NUM_SCHEDULERS = 1 STORE_LOCATION_COUNT = 2 + DELTA_MASTER_CONFIG = { + "logging": { + "abort_on_alert": False, + }, + } + DELTA_NODE_CONFIG = { "tags": ["config_tag1", "config_tag2"], "exec_agent": { @@ -269,3 +286,88 @@ def chunk_count(): wait(lambda: get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid))) wait(lambda: chunk_id not in find_location_chunks(location_uuid) and chunk_id in find_location_chunks(other_location_uuid)) + + @authors("don-dron") + def test_lost_location(self): + update_nodes_dynamic_config({"data_node": {"abort_on_location_disabled": False, "publish_disabled_locations": True}}) + nodes = ls("//sys/cluster_nodes") + assert len(nodes) == 2 + + create("file", "//tmp/f") + + def can_write(key="a"): + try: + write_file("//tmp/f", str.encode(key)) + return True + except YtError: + return False + + for node in nodes: + wait(lambda: exists("//sys/cluster_nodes/{0}/orchid/reboot_manager".format(node))) + wait(lambda: not get("//sys/cluster_nodes/{0}/orchid/reboot_manager/need_reboot".format(node))) + wait(lambda: get("//sys/cluster_nodes/{0}/@resource_limits/user_slots".format(node)) > 0) + wait(lambda: can_write()) + + for node in ls("//sys/cluster_nodes", attributes=["chunk_locations"]): + def chunk_count(): + count = 0 + for location_uuid, _ in get("//sys/cluster_nodes/{}/@chunk_locations".format(node)).items(): + wait(lambda: can_write("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid))) + count = count + get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)) + return count + wait(lambda: chunk_count() != 0) + + node = ls("//sys/cluster_nodes", attributes=["chunk_locations"])[0] + locations = get("//sys/cluster_nodes/{}/@chunk_locations".format(node)) + location_uuids = list(locations.keys()) + + for uuid in location_uuids: + wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(uuid)) > 0) + + location_uuid = location_uuids[0] + + wait(lambda: len(disable_chunk_locations(node, [location_uuid])) > 0) + wait(lambda: not get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid))) + wait(lambda: get("//sys/chunk_locations/{}/@statistics/session_count".format(location_uuid)) == 0) + wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)) == 0) + + # Disable disabled location publishing + update_nodes_dynamic_config({"data_node": {"abort_on_location_disabled": False, "publish_disabled_locations": False}}) + + with Restarter(self.Env, NODES_SERVICE): + time.sleep(2) + pass + + wait(lambda: not get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid))) + wait(lambda: get("//sys/chunk_locations/{}/@statistics/session_count".format(location_uuid)) == 0) + wait(lambda: get("//sys/chunk_locations/{}/@statistics/chunk_count".format(location_uuid)) == 0) + wait(lambda: len(resurrect_chunk_locations(node, [location_uuid])) > 0) + wait(lambda: get("//sys/chunk_locations/{}/@statistics/enabled".format(location_uuid))) + + wait(lambda: can_write()) + time.sleep(2) + + def contains_alert(logs_path): + master_files = [join(logs_path, f) for f in listdir(logs_path) if "master" in f and ".log.zst" in f and isfile(join(logs_path, f))] + + for file_path in master_files: + with open(file_path, "rb") as log_file: + decompressor = zstd.ZstdDecompressor() + binary_reader = decompressor.stream_reader(log_file, read_size=8192) + text_stream = io.TextIOWrapper(binary_reader, encoding='utf-8') + for line in text_stream: + if "heartbeat with dangling location" in line: + return True + + return False + + def check_alert(): + res = contains_alert(self.path_to_run + "/logs") + + if res: + return res + else: + time.sleep(1) + return res + + wait(lambda: check_alert())