Skip to content

Commit

Permalink
Reinit consensus (qdrant#5265)
Browse files Browse the repository at this point in the history
* option to clear consensus state while preserving peer id

* compact or clear consensus WAL on re-init

* Add test on reinit

* minor changes

* more points check suggestion

---------

Co-authored-by: tellet-q <elena.dubrovina@qdrant.com>
  • Loading branch information
2 people authored and timvisee committed Nov 8, 2024
1 parent ee386a8 commit c71b008
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 21 deletions.
6 changes: 6 additions & 0 deletions lib/storage/src/content_manager/consensus/consensus_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ impl ConsensusOpWal {
offset.wal_index + self.wal.num_entries() - 1, // there's always *at least* 1 entry, because WAL is not empty
);

log::debug!(
"Compacting WAL until Raft index {}, WAL index {}",
until_raft_index,
compact_until_wal_index,
);

// Compact WAL
self.compacted_until_raft_index = offset.wal_to_raft(compact_until_wal_index);
self.wal.prefix_truncate(compact_until_wal_index)?;
Expand Down
39 changes: 33 additions & 6 deletions lib/storage/src/content_manager/consensus/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,46 @@ impl Persistent {
pub fn load_or_init(
storage_path: impl AsRef<Path>,
first_peer: bool,
reinit: bool,
) -> Result<Self, StorageError> {
create_dir_all(storage_path.as_ref())?;
let path_legacy = storage_path.as_ref().join(STATE_FILE_NAME_CBOR);
let path_json = storage_path.as_ref().join(STATE_FILE_NAME);
let state = if path_json.exists() {
let mut state = if path_json.exists() {
log::info!("Loading raft state from {}", path_json.display());
Self::load_json(path_json)?
Self::load_json(path_json.clone())?
} else if path_legacy.exists() {
log::info!("Loading raft state from {}", path_legacy.display());
let mut state = Self::load(path_legacy)?;
// migrate to json
state.path = path_json;
state.path = path_json.clone();
state.save()?;
state
} else {
log::info!("Initializing new raft state at {}", path_json.display());
Self::init(path_json, first_peer)?
Self::init(path_json.clone(), first_peer, None)?
};

let state = if reinit {
if first_peer {
// Re-initialize consensus of the first peer is different from the rest
// Effectively, we should remove all other peers from voters and learners
// assuming that other peers would need to join consensus again.
// PeerId if the current peer should stay in the list of voters,
// so we can accept consensus operations.
state.state.conf_state.voters = vec![state.this_peer_id];
state.state.conf_state.learners = vec![];
state.state.hard_state.vote = state.this_peer_id;
state.save()?;
state
} else {
// We want to re-initialize consensus while preserve the peer ID
// which is needed for migration from one cluster to another
let keep_peer_id = state.this_peer_id;
Self::init(path_json, first_peer, Some(keep_peer_id))?
}
} else {
state
};

log::debug!("State: {:?}", state);
Expand Down Expand Up @@ -235,10 +258,14 @@ impl Persistent {
///
/// `first_peer` - if this is a first peer in a new deployment (e.g. it does not bootstrap from anyone)
/// It is `None` if distributed deployment is disabled
fn init(path: PathBuf, first_peer: bool) -> Result<Self, StorageError> {
fn init(
path: PathBuf,
first_peer: bool,
this_peer_id: Option<PeerId>,
) -> Result<Self, StorageError> {
// Do not generate too big peer ID, to avoid problems with serialization
// (especially in json format)
let this_peer_id = rand::random::<PeerId>() % (1 << 53);
let this_peer_id = this_peer_id.unwrap_or_else(|| rand::random::<PeerId>() % (1 << 53));
let voters = if first_peer {
vec![this_peer_id]
} else {
Expand Down
26 changes: 20 additions & 6 deletions lib/storage/src/content_manager/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,10 @@ impl<C: CollectionContainer> ConsensusManager<C> {
self.toc.sync_local_state()
}

pub fn clear_wal(&self) -> Result<(), StorageError> {
self.wal.lock().clear()
}

pub fn compact_wal(&self, min_entries_to_compact: u64) -> Result<bool, StorageError> {
if min_entries_to_compact == 0 {
return Ok(false);
Expand All @@ -789,7 +793,8 @@ impl<C: CollectionContainer> ConsensusManager<C> {

let first_unapplied_index = applied_index + 1;

debug_assert!(first_unapplied_index <= first_entry.index);
// ToDo: it seems like a mistake, need to check if it's correct
// debug_assert!(first_unapplied_index <= first_entry.index);

if first_unapplied_index - first_entry.index < min_entries_to_compact {
return Ok(false);
Expand Down Expand Up @@ -924,10 +929,19 @@ impl<C: CollectionContainer> Storage for ConsensusManager<C> {
_context: GetEntriesContext,
) -> raft::Result<Vec<RaftEntry>> {
let max_size: Option<_> = max_size.into();
if low < self.first_index()? {
let first_index = self.first_index()?;
if low < first_index {
log::debug!(
"Requested entries from {} to {} are already compacted (first index: {})",
low,
high,
first_index
);
return Err(raft::Error::Store(raft::StorageError::Compacted));
}

log::debug!("Requesting entries from {} to {}", low, high);

if high > self.last_index()? + 1 {
panic!(
"index out of bound (last: {}, high: {})",
Expand Down Expand Up @@ -1069,7 +1083,7 @@ mod tests {
#[test]
fn update_is_applied() {
let dir = Builder::new().prefix("raft_state_test").tempdir().unwrap();
let mut state = Persistent::load_or_init(dir.path(), false).unwrap();
let mut state = Persistent::load_or_init(dir.path(), false, false).unwrap();
assert_eq!(state.state().hard_state.commit, 0);
state
.apply_state_update(|state| state.hard_state.commit = 1)
Expand All @@ -1091,13 +1105,13 @@ mod tests {
#[test]
fn state_is_loaded() {
let dir = Builder::new().prefix("raft_state_test").tempdir().unwrap();
let mut state = Persistent::load_or_init(dir.path(), false).unwrap();
let mut state = Persistent::load_or_init(dir.path(), false, false).unwrap();
state
.apply_state_update(|state| state.hard_state.commit = 1)
.unwrap();
assert_eq!(state.state().hard_state.commit, 1);

let state_loaded = Persistent::load_or_init(dir.path(), false).unwrap();
let state_loaded = Persistent::load_or_init(dir.path(), false, false).unwrap();
assert_eq!(state_loaded.state().hard_state.commit, 1);
}

Expand Down Expand Up @@ -1195,7 +1209,7 @@ mod tests {
entries: Vec<Entry>,
path: &std::path::Path,
) -> (ConsensusManager<NoCollections>, MemStorage) {
let persistent = Persistent::load_or_init(path, true).unwrap();
let persistent = Persistent::load_or_init(path, true, false).unwrap();
let (sender, _) = mpsc::channel();
let consensus_state = ConsensusManager::new(
persistent,
Expand Down
32 changes: 29 additions & 3 deletions src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl Consensus {
telemetry_collector: Arc<parking_lot::Mutex<TonicTelemetryCollector>>,
toc: Arc<TableOfContent>,
runtime: Handle,
reinit: bool,
) -> anyhow::Result<JoinHandle<std::io::Result<()>>> {
let tls_client_config = helpers::load_tls_client_config(&settings)?;

Expand All @@ -88,6 +89,7 @@ impl Consensus {
tls_client_config,
channel_service,
runtime.clone(),
reinit,
)?;

let state_ref_clone = state_ref.clone();
Expand Down Expand Up @@ -179,7 +181,24 @@ impl Consensus {
tls_config: Option<ClientTlsConfig>,
channel_service: ChannelService,
runtime: Handle,
reinit: bool,
) -> anyhow::Result<(Self, Sender<Message>)> {
// If we want to re-initialize consensus, we need to prevent other peers
// from re-playing consensus WAL operations, as they should already have them applied.
// Do ensure that we are forcing compacting WAL on the first re-initialized peer,
// which should trigger snapshot transferring instead of replaying WAL.
let force_compact_wal = reinit && bootstrap_peer.is_none();

// On the bootstrap-ed peers during reinit of the consensus
// we want to make sure only the bootstrap peer will hold the true state
// Therefore we clear the WAL on the bootstrap peer to force it to request a snapshot
let clear_wal = reinit && bootstrap_peer.is_some();

if clear_wal {
log::debug!("Clearing WAL on the bootstrap peer to force snapshot transfer");
state_ref.clear_wal()?;
}

// raft will not return entries to the application smaller or equal to `applied`
let last_applied = state_ref.last_applied_entry().unwrap_or_default();
let raft_config = Config {
Expand All @@ -201,7 +220,7 @@ impl Consensus {
// bounded channel for backpressure
let (sender, receiver) = tokio::sync::mpsc::channel(config.max_message_queue_size);
// State might be initialized but the node might be shutdown without actually syncing or committing anything.
if state_ref.is_new_deployment() {
if state_ref.is_new_deployment() || reinit {
let leader_established_in_ms =
config.tick_period_ms * raft_config.max_election_tick() as u64;
Self::init(
Expand Down Expand Up @@ -243,7 +262,13 @@ impl Consensus {
// Before consensus has started apply any unapplied committed entries
// They might have not been applied due to unplanned Qdrant shutdown
let _stop_consensus = state_ref.apply_entries(&mut node)?;
state_ref.compact_wal(config.compact_wal_entries)?;

if force_compact_wal {
// Making sure that the WAL will be compacted on start
state_ref.compact_wal(1)?;
} else {
state_ref.compact_wal(config.compact_wal_entries)?;
}

let broker = RaftMessageBroker::new(
runtime.clone(),
Expand Down Expand Up @@ -1380,7 +1405,7 @@ mod tests {
let handle = general_runtime.handle().clone();
let (propose_sender, propose_receiver) = std::sync::mpsc::channel();
let persistent_state =
Persistent::load_or_init(&settings.storage.storage_path, true).unwrap();
Persistent::load_or_init(&settings.storage.storage_path, true, false).unwrap();
let operation_sender = OperationSender::new(propose_sender);
let toc = TableOfContent::new(
&settings.storage,
Expand Down Expand Up @@ -1413,6 +1438,7 @@ mod tests {
None,
ChannelService::new(settings.service.http_port, None),
handle.clone(),
false,
)
.unwrap();

Expand Down
19 changes: 17 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ struct Args {
/// Run stacktrace collector. Used for debugging.
#[arg(long, action, default_value_t = false)]
stacktrace: bool,

/// Reinit consensus state.
/// When enabled, the service will assume the consensus should be reinitialized.
/// The exact behavior depends on if this current node has bootstrap URI or not.
/// If it has - it'll remove current consensus state and consensus WAL (while keeping peer ID)
/// and will try to receive state from the bootstrap peer.
/// If it doesn't have - it'll remove other peers from voters promote
/// the current peer to the leader and the single member of the cluster.
/// It'll also compact consensus WAL to force snapshot
#[arg(long, action, default_value_t = false)]
reinit: bool,
}

fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -165,8 +176,11 @@ fn main() -> anyhow::Result<()> {
settings.validate_and_warn();

// Saved state of the consensus.
let persistent_consensus_state =
Persistent::load_or_init(&settings.storage.storage_path, args.bootstrap.is_none())?;
let persistent_consensus_state = Persistent::load_or_init(
&settings.storage.storage_path,
args.bootstrap.is_none(),
args.reinit,
)?;

let is_distributed_deployment = settings.cluster.enabled;

Expand Down Expand Up @@ -326,6 +340,7 @@ fn main() -> anyhow::Result<()> {
tonic_telemetry_collector,
toc_arc.clone(),
runtime_handle.clone(),
args.reinit,
)
.expect("Can't initialize consensus");

Expand Down
99 changes: 99 additions & 0 deletions tests/consensus_tests/test_reinit_consensus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import pathlib
from operator import itemgetter
from random import randrange
from time import sleep

from .fixtures import create_collection, drop_collection
from .utils import *

N_PEERS = 2
N_SHARDS = 2
N_REPLICAS = 1
N_COLLECTION_LOOPS = 3
COLLECTION_NAME = "test_collection"
POINTS_JSON = {
"points": [
{"id": 1, "vector": [0.05, 0.61, 0.76, 0.74], "payload": {"city": "Berlin"}},
{"id": 2, "vector": [0.19, 0.81, 0.75, 0.11], "payload": {"city": "London"}},
{"id": 3, "vector": [0.36, 0.55, 0.47, 0.94], "payload": {"city": "Paris"}},
{"id": 4, "vector": [0.18, 0.01, 0.85, 0.80], "payload": {"city": "Malaga"}},
{"id": 5, "vector": [0.24, 0.18, 0.22, 0.44], "payload": {"city": "New York"}},
{"id": 6, "vector": [0.35, 0.08, 0.11, 0.44], "payload": {"city": "Munich"}},
{"id": 7, "vector": [0.45, 0.07, 0.21, 0.04], "payload": {"city": "Madrid"}},
{"id": 8, "vector": [0.75, 0.18, 0.91, 0.48], "payload": {"city": "Prague"}},
{"id": 9, "vector": [0.30, 0.01, 0.10, 0.12], "payload": {"city": "Bratislava"}},
{"id": 10, "vector": [0.95, 0.8, 0.17, 0.19], "payload": {"city": "Tokyo"}},
]
}


def get_collection_points(uri):
req_json = {
"limit": len(POINTS_JSON["points"]),
"with_payload": True,
"with_vector": True
}
res = requests.post(
f"{uri}/collections/{COLLECTION_NAME}/points/scroll", json=req_json
)
assert_http_ok(res)
return res.json()["result"]["points"]


def compare_points(uri):
original_points = POINTS_JSON["points"]
fetched_points = get_collection_points(uri)
original_points, fetched_points = [sorted(l, key=itemgetter('id')) for l in (original_points, fetched_points)]
pairs = zip(original_points, fetched_points)
diff = [(x, y) for x, y in pairs if x != y]
assert len(diff) == 0, f'Original and final points are not equal, diff: "{diff}"'


def test_reinit_consensus(tmp_path: pathlib.Path):
assert_project_root()
peer_urls, peer_dirs, bootstrap_url = start_cluster(tmp_path, N_PEERS)

create_collection(peer_urls[0], shard_number=N_SHARDS, replication_factor=N_REPLICAS)
wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls)

for i in range(N_COLLECTION_LOOPS):
drop_collection(peer_urls[randrange(N_PEERS)], collection=COLLECTION_NAME)

create_collection(peer_urls[randrange(N_PEERS)], shard_number=N_SHARDS, replication_factor=N_REPLICAS)
wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls)

r_batch = requests.put(
f"{peer_urls[randrange(N_PEERS)]}/collections/{COLLECTION_NAME}/points?wait=true", json=POINTS_JSON)
assert_http_ok(r_batch)

# assert data in both shards
for peer_api_uri in peer_urls:
info = get_collection_cluster_info(peer_api_uri, COLLECTION_NAME)
for shard in info["local_shards"]:
assert shard["points_count"] > 0

print("Stop the peers, keep data")
for _ in range(N_PEERS):
processes.pop().kill()

print("Start the peers with new urls")
peer_urls_new = []
(bootstrap_api_uri, bootstrap_uri) = start_first_peer(peer_dirs[0], "peer_0_restarted.log", reinit=True)
peer_urls_new.append(bootstrap_api_uri)
leader = wait_peer_added(bootstrap_api_uri, expected_size=2)
for i in range(1, len(peer_dirs)):
peer_urls_new.append(start_peer(peer_dirs[i], f"peer_{i}_restarted.log", bootstrap_uri, reinit=True))
wait_for_uniform_cluster_status(peer_urls_new, leader)
wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls_new)

compare_points(peer_urls_new[0])

print("Add one more peer")
peer_dir_additional = make_peer_folder(tmp_path, N_PEERS)
peer_dirs.append(peer_dir_additional)
peer_url_additional = start_peer(peer_dir_additional, f"peer_{N_PEERS}_additional.log", bootstrap_uri)
wait_peer_added(bootstrap_api_uri, expected_size=3)
peer_urls_new.append(peer_url_additional)
wait_for_uniform_cluster_status(peer_urls_new, leader)
wait_collection_exists_and_active_on_all_peers(collection_name=COLLECTION_NAME, peer_api_uris=peer_urls_new)
compare_points(peer_urls_new[1])
Loading

0 comments on commit c71b008

Please sign in to comment.