From 34887c869f2a6ea652e22c2c0003af52987017de Mon Sep 17 00:00:00 2001 From: Andrey Vasnetsov Date: Tue, 29 Oct 2024 11:48:28 +0100 Subject: [PATCH] Reinit consensus (#5265) * option to clear consensus state while preserving peer id * compact or clear consensus WAL on re-init * Add test on reinit * minor changes * more points check suggestion --------- Co-authored-by: tellet-q --- .../consensus/consensus_wal.rs | 6 ++ .../content_manager/consensus/persistent.rs | 39 ++++++-- .../src/content_manager/consensus_manager.rs | 26 +++-- src/consensus.rs | 32 +++++- src/main.rs | 19 +++- .../consensus_tests/test_reinit_consensus.py | 99 +++++++++++++++++++ tests/consensus_tests/utils.py | 15 ++- 7 files changed, 215 insertions(+), 21 deletions(-) create mode 100644 tests/consensus_tests/test_reinit_consensus.py diff --git a/lib/storage/src/content_manager/consensus/consensus_wal.rs b/lib/storage/src/content_manager/consensus/consensus_wal.rs index b45a724feb7..583d4b2e751 100644 --- a/lib/storage/src/content_manager/consensus/consensus_wal.rs +++ b/lib/storage/src/content_manager/consensus/consensus_wal.rs @@ -293,6 +293,12 @@ impl ConsensusOpWal { offset.wal_index + self.wal.num_entries() - 1, // there's always *at least* 1 entry, because WAL is not empty ); + log::debug!( + "Compacting WAL until Raft index {}, WAL index {}", + until_raft_index, + compact_until_wal_index, + ); + // Compact WAL self.compacted_until_raft_index = offset.wal_to_raft(compact_until_wal_index); self.wal.prefix_truncate(compact_until_wal_index)?; diff --git a/lib/storage/src/content_manager/consensus/persistent.rs b/lib/storage/src/content_manager/consensus/persistent.rs index 9ca9747ca1b..d3fed6f3cbc 100644 --- a/lib/storage/src/content_manager/consensus/persistent.rs +++ b/lib/storage/src/content_manager/consensus/persistent.rs @@ -84,23 +84,46 @@ impl Persistent { pub fn load_or_init( storage_path: impl AsRef, first_peer: bool, + reinit: bool, ) -> Result { create_dir_all(storage_path.as_ref())?; let path_legacy = storage_path.as_ref().join(STATE_FILE_NAME_CBOR); let path_json = storage_path.as_ref().join(STATE_FILE_NAME); - let state = if path_json.exists() { + let mut state = if path_json.exists() { log::info!("Loading raft state from {}", path_json.display()); - Self::load_json(path_json)? + Self::load_json(path_json.clone())? } else if path_legacy.exists() { log::info!("Loading raft state from {}", path_legacy.display()); let mut state = Self::load(path_legacy)?; // migrate to json - state.path = path_json; + state.path = path_json.clone(); state.save()?; state } else { log::info!("Initializing new raft state at {}", path_json.display()); - Self::init(path_json, first_peer)? + Self::init(path_json.clone(), first_peer, None)? + }; + + let state = if reinit { + if first_peer { + // Re-initialize consensus of the first peer is different from the rest + // Effectively, we should remove all other peers from voters and learners + // assuming that other peers would need to join consensus again. + // PeerId if the current peer should stay in the list of voters, + // so we can accept consensus operations. + state.state.conf_state.voters = vec![state.this_peer_id]; + state.state.conf_state.learners = vec![]; + state.state.hard_state.vote = state.this_peer_id; + state.save()?; + state + } else { + // We want to re-initialize consensus while preserve the peer ID + // which is needed for migration from one cluster to another + let keep_peer_id = state.this_peer_id; + Self::init(path_json, first_peer, Some(keep_peer_id))? + } + } else { + state }; log::debug!("State: {:?}", state); @@ -235,10 +258,14 @@ impl Persistent { /// /// `first_peer` - if this is a first peer in a new deployment (e.g. it does not bootstrap from anyone) /// It is `None` if distributed deployment is disabled - fn init(path: PathBuf, first_peer: bool) -> Result { + fn init( + path: PathBuf, + first_peer: bool, + this_peer_id: Option, + ) -> Result { // Do not generate too big peer ID, to avoid problems with serialization // (especially in json format) - let this_peer_id = rand::random::() % (1 << 53); + let this_peer_id = this_peer_id.unwrap_or_else(|| rand::random::() % (1 << 53)); let voters = if first_peer { vec![this_peer_id] } else { diff --git a/lib/storage/src/content_manager/consensus_manager.rs b/lib/storage/src/content_manager/consensus_manager.rs index 06bbc2caeae..decbdf91e64 100644 --- a/lib/storage/src/content_manager/consensus_manager.rs +++ b/lib/storage/src/content_manager/consensus_manager.rs @@ -774,6 +774,10 @@ impl ConsensusManager { self.toc.sync_local_state() } + pub fn clear_wal(&self) -> Result<(), StorageError> { + self.wal.lock().clear() + } + pub fn compact_wal(&self, min_entries_to_compact: u64) -> Result { if min_entries_to_compact == 0 { return Ok(false); @@ -789,7 +793,8 @@ impl ConsensusManager { let first_unapplied_index = applied_index + 1; - debug_assert!(first_unapplied_index <= first_entry.index); + // ToDo: it seems like a mistake, need to check if it's correct + // debug_assert!(first_unapplied_index <= first_entry.index); if first_unapplied_index - first_entry.index < min_entries_to_compact { return Ok(false); @@ -924,10 +929,19 @@ impl Storage for ConsensusManager { _context: GetEntriesContext, ) -> raft::Result> { let max_size: Option<_> = max_size.into(); - if low < self.first_index()? { + let first_index = self.first_index()?; + if low < first_index { + log::debug!( + "Requested entries from {} to {} are already compacted (first index: {})", + low, + high, + first_index + ); return Err(raft::Error::Store(raft::StorageError::Compacted)); } + log::debug!("Requesting entries from {} to {}", low, high); + if high > self.last_index()? + 1 { panic!( "index out of bound (last: {}, high: {})", @@ -1069,7 +1083,7 @@ mod tests { #[test] fn update_is_applied() { let dir = Builder::new().prefix("raft_state_test").tempdir().unwrap(); - let mut state = Persistent::load_or_init(dir.path(), false).unwrap(); + let mut state = Persistent::load_or_init(dir.path(), false, false).unwrap(); assert_eq!(state.state().hard_state.commit, 0); state .apply_state_update(|state| state.hard_state.commit = 1) @@ -1091,13 +1105,13 @@ mod tests { #[test] fn state_is_loaded() { let dir = Builder::new().prefix("raft_state_test").tempdir().unwrap(); - let mut state = Persistent::load_or_init(dir.path(), false).unwrap(); + let mut state = Persistent::load_or_init(dir.path(), false, false).unwrap(); state .apply_state_update(|state| state.hard_state.commit = 1) .unwrap(); assert_eq!(state.state().hard_state.commit, 1); - let state_loaded = Persistent::load_or_init(dir.path(), false).unwrap(); + let state_loaded = Persistent::load_or_init(dir.path(), false, false).unwrap(); assert_eq!(state_loaded.state().hard_state.commit, 1); } @@ -1195,7 +1209,7 @@ mod tests { entries: Vec, path: &std::path::Path, ) -> (ConsensusManager, MemStorage) { - let persistent = Persistent::load_or_init(path, true).unwrap(); + let persistent = Persistent::load_or_init(path, true, false).unwrap(); let (sender, _) = mpsc::channel(); let consensus_state = ConsensusManager::new( persistent, diff --git a/src/consensus.rs b/src/consensus.rs index 5f0935f923b..50b9708273b 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -71,6 +71,7 @@ impl Consensus { telemetry_collector: Arc>, toc: Arc, runtime: Handle, + reinit: bool, ) -> anyhow::Result>> { let tls_client_config = helpers::load_tls_client_config(&settings)?; @@ -88,6 +89,7 @@ impl Consensus { tls_client_config, channel_service, runtime.clone(), + reinit, )?; let state_ref_clone = state_ref.clone(); @@ -179,7 +181,24 @@ impl Consensus { tls_config: Option, channel_service: ChannelService, runtime: Handle, + reinit: bool, ) -> anyhow::Result<(Self, Sender)> { + // If we want to re-initialize consensus, we need to prevent other peers + // from re-playing consensus WAL operations, as they should already have them applied. + // Do ensure that we are forcing compacting WAL on the first re-initialized peer, + // which should trigger snapshot transferring instead of replaying WAL. + let force_compact_wal = reinit && bootstrap_peer.is_none(); + + // On the bootstrap-ed peers during reinit of the consensus + // we want to make sure only the bootstrap peer will hold the true state + // Therefore we clear the WAL on the bootstrap peer to force it to request a snapshot + let clear_wal = reinit && bootstrap_peer.is_some(); + + if clear_wal { + log::debug!("Clearing WAL on the bootstrap peer to force snapshot transfer"); + state_ref.clear_wal()?; + } + // raft will not return entries to the application smaller or equal to `applied` let last_applied = state_ref.last_applied_entry().unwrap_or_default(); let raft_config = Config { @@ -201,7 +220,7 @@ impl Consensus { // bounded channel for backpressure let (sender, receiver) = tokio::sync::mpsc::channel(config.max_message_queue_size); // State might be initialized but the node might be shutdown without actually syncing or committing anything. - if state_ref.is_new_deployment() { + if state_ref.is_new_deployment() || reinit { let leader_established_in_ms = config.tick_period_ms * raft_config.max_election_tick() as u64; Self::init( @@ -243,7 +262,13 @@ impl Consensus { // Before consensus has started apply any unapplied committed entries // They might have not been applied due to unplanned Qdrant shutdown let _stop_consensus = state_ref.apply_entries(&mut node)?; - state_ref.compact_wal(config.compact_wal_entries)?; + + if force_compact_wal { + // Making sure that the WAL will be compacted on start + state_ref.compact_wal(1)?; + } else { + state_ref.compact_wal(config.compact_wal_entries)?; + } let broker = RaftMessageBroker::new( runtime.clone(), @@ -1380,7 +1405,7 @@ mod tests { let handle = general_runtime.handle().clone(); let (propose_sender, propose_receiver) = std::sync::mpsc::channel(); let persistent_state = - Persistent::load_or_init(&settings.storage.storage_path, true).unwrap(); + Persistent::load_or_init(&settings.storage.storage_path, true, false).unwrap(); let operation_sender = OperationSender::new(propose_sender); let toc = TableOfContent::new( &settings.storage, @@ -1413,6 +1438,7 @@ mod tests { None, ChannelService::new(settings.service.http_port, None), handle.clone(), + false, ) .unwrap(); diff --git a/src/main.rs b/src/main.rs index f081f746c6f..302b67b1423 100644 --- a/src/main.rs +++ b/src/main.rs @@ -119,6 +119,17 @@ struct Args { /// Run stacktrace collector. Used for debugging. #[arg(long, action, default_value_t = false)] stacktrace: bool, + + /// Reinit consensus state. + /// When enabled, the service will assume the consensus should be reinitialized. + /// The exact behavior depends on if this current node has bootstrap URI or not. + /// If it has - it'll remove current consensus state and consensus WAL (while keeping peer ID) + /// and will try to receive state from the bootstrap peer. + /// If it doesn't have - it'll remove other peers from voters promote + /// the current peer to the leader and the single member of the cluster. + /// It'll also compact consensus WAL to force snapshot + #[arg(long, action, default_value_t = false)] + reinit: bool, } fn main() -> anyhow::Result<()> { @@ -165,8 +176,11 @@ fn main() -> anyhow::Result<()> { settings.validate_and_warn(); // Saved state of the consensus. - let persistent_consensus_state = - Persistent::load_or_init(&settings.storage.storage_path, args.bootstrap.is_none())?; + let persistent_consensus_state = Persistent::load_or_init( + &settings.storage.storage_path, + args.bootstrap.is_none(), + args.reinit, + )?; let is_distributed_deployment = settings.cluster.enabled; @@ -326,6 +340,7 @@ fn main() -> anyhow::Result<()> { tonic_telemetry_collector, toc_arc.clone(), runtime_handle.clone(), + args.reinit, ) .expect("Can't initialize consensus"); diff --git a/tests/consensus_tests/test_reinit_consensus.py b/tests/consensus_tests/test_reinit_consensus.py new file mode 100644 index 00000000000..1f8a29ee118 --- /dev/null +++ b/tests/consensus_tests/test_reinit_consensus.py @@ -0,0 +1,99 @@ +import pathlib +from operator import itemgetter +from random import randrange +from time import sleep + +from .fixtures import create_collection, drop_collection +from .utils import * + +N_PEERS = 2 +N_SHARDS = 2 +N_REPLICAS = 1 +N_COLLECTION_LOOPS = 3 +COLLECTION_NAME = "test_collection" +POINTS_JSON = { + "points": [ + {"id": 1, "vector": [0.05, 0.61, 0.76, 0.74], "payload": {"city": "Berlin"}}, + {"id": 2, "vector": [0.19, 0.81, 0.75, 0.11], "payload": {"city": "London"}}, + {"id": 3, "vector": [0.36, 0.55, 0.47, 0.94], "payload": {"city": "Paris"}}, + {"id": 4, "vector": [0.18, 0.01, 0.85, 0.80], "payload": {"city": "Malaga"}}, + {"id": 5, "vector": [0.24, 0.18, 0.22, 0.44], "payload": {"city": "New York"}}, + {"id": 6, "vector": [0.35, 0.08, 0.11, 0.44], "payload": {"city": "Munich"}}, + {"id": 7, "vector": [0.45, 0.07, 0.21, 0.04], "payload": {"city": "Madrid"}}, + {"id": 8, "vector": [0.75, 0.18, 0.91, 0.48], "payload": {"city": "Prague"}}, + {"id": 9, "vector": [0.30, 0.01, 0.10, 0.12], "payload": {"city": "Bratislava"}}, + {"id": 10, "vector": [0.95, 0.8, 0.17, 0.19], "payload": {"city": "Tokyo"}}, + ] +} + + +def get_collection_points(uri): + req_json = { + "limit": len(POINTS_JSON["points"]), + "with_payload": True, + "with_vector": True + } + res = requests.post( + f"{uri}/collections/{COLLECTION_NAME}/points/scroll", json=req_json + ) + assert_http_ok(res) + return res.json()["result"]["points"] + + +def compare_points(uri): + original_points = POINTS_JSON["points"] + fetched_points = get_collection_points(uri) + original_points, fetched_points = [sorted(l, key=itemgetter('id')) for l in (original_points, fetched_points)] + pairs = zip(original_points, fetched_points) + diff = [(x, y) for x, y in pairs if x != y] + assert len(diff) == 0, f'Original and final points are not equal, diff: "{diff}"' + + +def test_reinit_consensus(tmp_path: pathlib.Path): + assert_project_root() + peer_urls, peer_dirs, bootstrap_url = start_cluster(tmp_path, N_PEERS) + + create_collection(peer_urls[0], shard_number=N_SHARDS, replication_factor=N_REPLICAS) + wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls) + + for i in range(N_COLLECTION_LOOPS): + drop_collection(peer_urls[randrange(N_PEERS)], collection=COLLECTION_NAME) + + create_collection(peer_urls[randrange(N_PEERS)], shard_number=N_SHARDS, replication_factor=N_REPLICAS) + wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls) + + r_batch = requests.put( + f"{peer_urls[randrange(N_PEERS)]}/collections/{COLLECTION_NAME}/points?wait=true", json=POINTS_JSON) + assert_http_ok(r_batch) + + # assert data in both shards + for peer_api_uri in peer_urls: + info = get_collection_cluster_info(peer_api_uri, COLLECTION_NAME) + for shard in info["local_shards"]: + assert shard["points_count"] > 0 + + print("Stop the peers, keep data") + for _ in range(N_PEERS): + processes.pop().kill() + + print("Start the peers with new urls") + peer_urls_new = [] + (bootstrap_api_uri, bootstrap_uri) = start_first_peer(peer_dirs[0], "peer_0_restarted.log", reinit=True) + peer_urls_new.append(bootstrap_api_uri) + leader = wait_peer_added(bootstrap_api_uri, expected_size=2) + for i in range(1, len(peer_dirs)): + peer_urls_new.append(start_peer(peer_dirs[i], f"peer_{i}_restarted.log", bootstrap_uri, reinit=True)) + wait_for_uniform_cluster_status(peer_urls_new, leader) + wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls_new) + + compare_points(peer_urls_new[0]) + + print("Add one more peer") + peer_dir_additional = make_peer_folder(tmp_path, N_PEERS) + peer_dirs.append(peer_dir_additional) + peer_url_additional = start_peer(peer_dir_additional, f"peer_{N_PEERS}_additional.log", bootstrap_uri) + wait_peer_added(bootstrap_api_uri, expected_size=3) + peer_urls_new.append(peer_url_additional) + wait_for_uniform_cluster_status(peer_urls_new, leader) + wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls_new) + compare_points(peer_urls_new[1]) diff --git a/tests/consensus_tests/utils.py b/tests/consensus_tests/utils.py index 1a7f2ef165e..8289b633934 100644 --- a/tests/consensus_tests/utils.py +++ b/tests/consensus_tests/utils.py @@ -112,7 +112,7 @@ def init_pytest_log_folder() -> str: # Starts a peer and returns its api_uri -def start_peer(peer_dir: Path, log_file: str, bootstrap_uri: str, port=None, extra_env=None) -> str: +def start_peer(peer_dir: Path, log_file: str, bootstrap_uri: str, port=None, extra_env=None, reinit=False) -> str: if extra_env is None: extra_env = {} p2p_port = get_port() if port is None else port + 0 @@ -132,14 +132,18 @@ def start_peer(peer_dir: Path, log_file: str, bootstrap_uri: str, port=None, ext f" http: http://localhost:{http_port}/cluster, p2p: {p2p_port}") this_peer_consensus_uri = get_uri(p2p_port) - proc = Popen([get_qdrant_exec(), "--bootstrap", bootstrap_uri, "--uri", this_peer_consensus_uri], env=env, + if reinit: + proc = Popen([get_qdrant_exec(), "--bootstrap", bootstrap_uri, "--uri", this_peer_consensus_uri, "--reinit"], env=env, + cwd=peer_dir, stdout=log_file) + else: + proc = Popen([get_qdrant_exec(), "--bootstrap", bootstrap_uri, "--uri", this_peer_consensus_uri], env=env, cwd=peer_dir, stdout=log_file) processes.append(PeerProcess(proc, http_port, grpc_port, p2p_port)) return get_uri(http_port) # Starts a peer and returns its api_uri and p2p_uri -def start_first_peer(peer_dir: Path, log_file: str, port=None, extra_env=None) -> Tuple[str, str]: +def start_first_peer(peer_dir: Path, log_file: str, port=None, extra_env=None, reinit=False) -> Tuple[str, str]: if extra_env is None: extra_env = {} @@ -160,7 +164,10 @@ def start_first_peer(peer_dir: Path, log_file: str, port=None, extra_env=None) - print(f"\nStarting first peer with uri {bootstrap_uri}," f" http: http://localhost:{http_port}/cluster, p2p: {p2p_port}") - proc = Popen([get_qdrant_exec(), "--uri", bootstrap_uri], env=env, cwd=peer_dir, stdout=log_file) + if reinit: + proc = Popen([get_qdrant_exec(), "--uri", bootstrap_uri, "--reinit"], env=env, cwd=peer_dir, stdout=log_file) + else: + proc = Popen([get_qdrant_exec(), "--uri", bootstrap_uri], env=env, cwd=peer_dir, stdout=log_file) processes.append(PeerProcess(proc, http_port, grpc_port, p2p_port)) return get_uri(http_port), bootstrap_uri