Skip to content

Commit

Permalink
Send Raft messages with gRPC client (qdrant#509)
Browse files Browse the repository at this point in the history
* Send Raft messages with gRPC client

* Review: more specific thread names
e-ivkov authored Apr 27, 2022
1 parent 5a34086 commit d4bb6e4
Showing 9 changed files with 153 additions and 22 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@ serde = { version = "~1.0", features = ["derive"] }
serde_json = "~1.0"
schemars = { version = "0.8.8", features = ["uuid"] }
itertools = "0.10"
anyhow = "1.0.57"
futures = "0.3.21"

config = "~0.13.1"

1 change: 1 addition & 0 deletions lib/api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ serde = { version = "~1.0", features = ["derive"] }
serde_json = "~1.0"
schemars = { version = "0.8.8", features = ["uuid"] }
uuid = { version = "0.8", features = ["v4", "serde"] }
tower = "0.4.12"

segment = {path = "../segment"}

13 changes: 13 additions & 0 deletions lib/api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,19 @@ pub mod models;
#[rustfmt::skip] // tonic uses `prettyplease` to format its output
pub mod qdrant;

use std::time::Duration;

use tonic::transport::{Channel, Error, Uri};
use tower::timeout::Timeout;

pub const fn api_crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}

pub async fn timeout_channel(
timeout: Duration,
peer_address: Uri,
) -> Result<Timeout<Channel>, Error> {
let channel = Channel::builder(peer_address).connect().await?;
Ok(Timeout::new(channel, timeout))
}
1 change: 1 addition & 0 deletions lib/storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ itertools = "0.10"
async-trait = "0.1.53"
log = "0.4"
tonic = "0.7.1"
http = "0.2"

# Consensus related
atomicwrites = { version = "0.3.1", optional = true }
34 changes: 32 additions & 2 deletions lib/storage/src/content_manager/raft_state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
collections::HashMap,
fs::File,
io::BufWriter,
path::{Path, PathBuf},
};

use atomicwrites::{AtomicFile, OverwriteBehavior::AllowOverwrite};
use itertools::Itertools;
use prost::Message;
use raft::{
eraftpb::{ConfState, HardState},
@@ -53,7 +55,7 @@ impl UnappliedEntries {
pub struct Persistent {
state: RaftStateWrapper,
unapplied_entries: UnappliedEntries,
peer_address_by_id: PeerAddressById,
peer_address_by_id: PeerAddressByIdWrapper,
this_peer_id: u64,
#[serde(skip)]
path: PathBuf,
@@ -108,7 +110,7 @@ impl Persistent {
}

pub fn peer_address_by_id(&self) -> &PeerAddressById {
&self.peer_address_by_id
&self.peer_address_by_id.0
}

pub fn this_peer_id(&self) -> u64 {
@@ -147,6 +149,34 @@ impl Persistent {
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(try_from = "HashMap<u64, String>")]
#[serde(into = "HashMap<u64, String>")]
struct PeerAddressByIdWrapper(PeerAddressById);

impl From<PeerAddressByIdWrapper> for HashMap<u64, String> {
fn from(wrapper: PeerAddressByIdWrapper) -> Self {
wrapper
.0
.into_iter()
.map(|(id, address)| (id, format!("{address}")))
.collect()
}
}

impl TryFrom<HashMap<u64, String>> for PeerAddressByIdWrapper {
type Error = http::uri::InvalidUri;

fn try_from(value: HashMap<u64, String>) -> Result<Self, Self::Error> {
Ok(PeerAddressByIdWrapper(
value
.into_iter()
.map(|(id, address)| address.parse().map(|address| (id, address)))
.try_collect()?,
))
}
}

#[derive(Debug, Serialize, Deserialize, Clone, Default)]
#[serde(try_from = "SerializableRaftState")]
#[serde(into = "SerializableRaftState")]
5 changes: 3 additions & 2 deletions lib/storage/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{collections::HashMap, net::SocketAddr};
use std::collections::HashMap;

use collection::config::WalConfig;
use collection::optimizers_builder::OptimizersConfig;
use schemars::JsonSchema;
use segment::types::HnswConfig;
use serde::{Deserialize, Serialize};
use tonic::transport::Uri;

pub type PeerAddressById = HashMap<u64, SocketAddr>;
pub type PeerAddressById = HashMap<u64, Uri>;

#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
pub struct PerformanceConfig {
109 changes: 94 additions & 15 deletions src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use std::{
sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender},
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::{self, Receiver, RecvTimeoutError, SyncSender},
},
time::{Duration, Instant},
};

use storage::content_manager::toc::TableOfContentRef;
use anyhow::Context;
use api::grpc::{
qdrant::{raft_client::RaftClient, RaftMessage as GrpcRaftMessage},
timeout_channel,
};
use storage::content_manager::{errors::StorageError, toc::TableOfContentRef};

use raft::{eraftpb::Message as RaftMessage, prelude::*};
use tokio::runtime::Runtime;
use tonic::transport::Uri;

const CHANNEL_CAPACITY: usize = 100;
const TICK_PERIOD_MS: u64 = 100;
const MESSAGE_TIMEOUT: Duration = Duration::from_millis(1000);

type Node = RawNode<TableOfContentRef>;

@@ -20,6 +31,7 @@ pub enum Message {
pub struct Consensus {
node: Node,
receiver: Receiver<Message>,
runtime: Runtime,
}

impl Consensus {
@@ -37,7 +49,21 @@ impl Consensus {
toc_ref.apply_entries()?;
let node = Node::new(&config, toc_ref, logger)?;
let (sender, receiver) = mpsc::sync_channel(CHANNEL_CAPACITY);
Ok((Self { node, receiver }, sender))
Ok((
Self {
node,
receiver,
runtime: tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("consensus-tokio-rt-{}", id)
})
.enable_all()
.build()?,
},
sender,
))
}

pub fn start(&mut self) -> raft::Result<()> {
@@ -67,12 +93,12 @@ impl Consensus {
} else {
timeout -= d;
}
on_ready(&mut self.node);
on_ready(&mut self.node, &self.runtime);
}
}
}

fn on_ready(raft_group: &mut Node) {
fn on_ready(raft_group: &mut Node, runtime: &Runtime) {
if !raft_group.has_ready() {
return;
}
@@ -81,8 +107,9 @@ fn on_ready(raft_group: &mut Node) {
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();
if !ready.messages().is_empty() {
// Send out the messages come from the node.
handle_messages(ready.take_messages());
if let Err(err) = handle_messages(ready.take_messages(), &store, runtime) {
log::error!("Failed to send messages: {err}")
}
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
@@ -109,8 +136,9 @@ fn on_ready(raft_group: &mut Node) {
}
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
if let Err(err) = handle_messages(ready.take_persisted_messages(), &store, runtime) {
log::error!("Failed to send messages: {err}")
}
}

// Advance the Raft.
@@ -122,8 +150,9 @@ fn on_ready(raft_group: &mut Node) {
log::error!("Failed to set commit index: {err}")
}
}
// Send out the messages.
handle_messages(light_rd.take_messages());
if let Err(err) = handle_messages(light_rd.take_messages(), &store, runtime) {
log::error!("Failed to send messages: {err}")
}
// Apply all committed entries.
if let Err(err) = handle_committed_entries(light_rd.take_committed_entries(), &store) {
log::error!("Failed to apply committed entries: {err}")
@@ -140,10 +169,60 @@ fn handle_committed_entries(entries: Vec<Entry>, toc: &TableOfContentRef) -> raf
Ok(())
}

fn handle_messages(messages: Vec<RaftMessage>) {
for _message in messages {
// TODO: send to other peers
}
fn handle_messages(
messages: Vec<RaftMessage>,
toc: &TableOfContentRef,
runtime: &Runtime,
) -> Result<(), StorageError> {
let peer_address_by_id = toc.peer_address_by_id()?;
let messages_with_address: Vec<_> = messages
.into_iter()
.map(|message| {
let address = peer_address_by_id.get(&message.to).cloned();
(message, address)
})
.collect();
let future = async move {
let mut send_futures = Vec::new();
for (message, address) in messages_with_address {
let address = match address {
Some(address) => address,
None => {
log::warn!(
"Address of peer with ID {} not found. Message was not sent to it.",
message.to
);
continue;
}
};
send_futures.push(send_message(address, message));
}
for result in futures::future::join_all(send_futures).await {
if let Err(err) = result {
log::warn!("Failed to send message: {err}")
}
}
};
// Raft does not need the responses and should not wait for timeouts
// so sending messages in parallel should be ok
runtime.spawn(future);
Ok(())
}

async fn send_message(address: Uri, message: RaftMessage) -> anyhow::Result<()> {
let channel = timeout_channel(MESSAGE_TIMEOUT, address)
.await
.context("Failed to create timeout channel")?;
let mut client = RaftClient::new(channel);
let mut bytes = Vec::new();
<RaftMessage as prost::Message>::encode(&message, &mut bytes)
.context("Failed to serialize Raft message")?;
let message = GrpcRaftMessage { message: bytes };
client
.send(tonic::Request::new(message))
.await
.context("gRPC call failed")?;
Ok(())
}

#[cfg(test)]
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ fn main() -> std::io::Result<()> {
let (mut consensus, message_sender) = Consensus::new(&slog_logger, toc_arc.clone().into())
.expect("Can't initialize consensus");
thread::Builder::new()
.name("raft".to_string())
.name("consensus".to_string())
.spawn(move || {
if let Err(err) = consensus.start() {
log::error!("Consensus stopped with error: {err}")

0 comments on commit d4bb6e4

Please sign in to comment.