Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): added client.wait_for_start() and improve error handling in sync daemon #495

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ impl AwClient {
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
proxy_method!(get_event_count, i64, bucketname: &str);
proxy_method!(get_info, aw_models::Info,);

pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
self.client.wait_for_start()
}
}
36 changes: 36 additions & 0 deletions aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error};

use chrono::{DateTime, Utc};
use serde_json::{json, Map};
use std::net::TcpStream;
use std::time::Duration;

pub use aw_models::{Bucket, BucketMetadata, Event};

Expand Down Expand Up @@ -221,4 +223,38 @@ impl AwClient {
let url = format!("{}/api/0/info", self.baseurl);
self.client.get(url).send().await?.json().await
}

// TODO: make async
ErikBjare marked this conversation as resolved.
Show resolved Hide resolved
pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
let socket_addrs = self.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.first()
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running with exponential backoff
let mut retry_delay = Duration::from_millis(100);
let max_wait = Duration::from_secs(10);
let mut total_wait = Duration::from_secs(0);

while total_wait < max_wait {
match TcpStream::connect_timeout(socket_addr, retry_delay) {
Ok(_) => break,
Err(_) => {
std::thread::sleep(retry_delay);
total_wait += retry_delay;
retry_delay *= 2;
}
}
}

if total_wait >= max_wait {
return Err(format!(
"Local server {} not running after 10 seconds of retrying",
socket_addr
)
.into());
}

Ok(())
}
}
21 changes: 15 additions & 6 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,23 @@ fn main() -> Result<(), Box<dyn Error>> {

fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
loop {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;
if let Err(e) = daemon_sync_cycle(client) {
error!("Error during sync cycle: {}", e);
// Re-throw the error
return Err(e);
}

info!("Sync pass done, sleeping for 5 minutes");

std::thread::sleep(std::time::Duration::from_secs(300));
}
}

fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;

Ok(())
}
11 changes: 1 addition & 10 deletions aw-sync/src/sync_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;
use std::fs;
use std::net::TcpStream;

use crate::sync::{sync_run, SyncMode, SyncSpec};
use aw_client_rust::blocking::AwClient;
Expand All @@ -14,15 +13,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box<dyn Error>> {
}

pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
let socket_addrs = client.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.get(0)
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running
if TcpStream::connect(socket_addr).is_err() {
return Err(format!("Local server {} not running", &client.baseurl).into());
}
client.wait_for_start()?;

// Path to the sync folder
// Sync folder is structured ./{hostname}/{device_id}/test.db
Expand Down
Loading