Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
Send git events to frontend. Chaos
Browse files Browse the repository at this point in the history
  • Loading branch information
rsdy committed Oct 31, 2023
1 parent 17ee8c4 commit 57dbcea
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 18 deletions.
129 changes: 127 additions & 2 deletions server/bleep/src/background/control.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::RwLock;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
};

use crate::repo::{FilterUpdate, RepoRef, SyncStatus};

Expand All @@ -14,9 +17,9 @@ enum ControlEvent {

pub struct SyncPipes {
reporef: RepoRef,
filter_updates: FilterUpdate,
progress: super::ProgressStream,
event: RwLock<Option<ControlEvent>>,
filter_updates: FilterUpdate,
}

impl SyncPipes {
Expand All @@ -33,6 +36,18 @@ impl SyncPipes {
}
}

pub(crate) fn git_sync_progress(&self) -> GitSync {
GitSync {
max: Arc::new(0.into()),
cnt: Arc::new(0.into()),
id: Default::default(),
name: Default::default(),
progress: self.progress.clone(),
reporef: self.reporef.clone(),
filter_updates: self.filter_updates.clone(),
}
}

pub(crate) fn index_percent(&self, current: u8) {
_ = self.progress.send(Progress {
reporef: self.reporef.clone(),
Expand Down Expand Up @@ -67,3 +82,113 @@ impl SyncPipes {
*self.event.write().unwrap() = Some(ControlEvent::Remove);
}
}

#[derive(Clone)]
pub(crate) struct GitSync {
max: Arc<AtomicUsize>,
cnt: Arc<AtomicUsize>,
progress: super::ProgressStream,
reporef: RepoRef,
filter_updates: FilterUpdate,
id: gix::progress::Id,
name: String,
}

impl gix::progress::Progress for GitSync {
fn init(
&mut self,
max: Option<gix::progress::prodash::progress::Step>,
_unit: Option<gix::progress::Unit>,
) {
let Some(max) = max else {
return;
};

self.max.store(max, Ordering::SeqCst);
self.cnt.store(0, Ordering::SeqCst);
}

fn set_name(&mut self, name: String) {
self.name = name;
}

fn name(&self) -> Option<String> {
Some(self.name.clone())
}

fn id(&self) -> gix::progress::Id {
self.id.clone()
}

fn message(&self, level: gix::progress::MessageLevel, message: String) {
println!("-- {level:?} : {name} {message}", name = self.name);
}
}

impl gix::progress::Count for GitSync {
fn set(&self, step: gix::progress::prodash::progress::Step) {
self.cnt.store(step, Ordering::SeqCst);

let current = ((step as f32 / self.max.load(Ordering::SeqCst) as f32) * 100f32) as u8;
// println!("-- {step:?} {name}", name = self.name);
// println!("set: {name} {}", current, name = self.name,);

_ = self.progress.send(Progress {
reporef: self.reporef.clone(),
branch_filter: self.filter_updates.branch_filter.clone(),
event: ProgressEvent::IndexPercent(current.min(100)),
});
}

fn step(&self) -> gix::progress::prodash::progress::Step {
1
}

fn inc_by(&self, step: gix::progress::prodash::progress::Step) {
self.cnt.fetch_add(step, Ordering::SeqCst);
let current = self.cnt.load(Ordering::SeqCst);

_ = self.progress.send(Progress {
reporef: self.reporef.clone(),
branch_filter: self.filter_updates.branch_filter.clone(),
event: ProgressEvent::IndexPercent((current % 100) as u8),
});
// println!("inc: {cnt}, {max}", max = self.max.load(Ordering::SeqCst));
}

fn counter(&self) -> gix::progress::StepShared {
self.cnt.clone()
}
}

impl gix::progress::NestedProgress for GitSync {
type SubProgress = Self;

fn add_child(&mut self, name: impl Into<String>) -> Self::SubProgress {
GitSync {
max: self.max.clone(),
cnt: self.cnt.clone(),
progress: self.progress.clone(),
id: self.id.clone(),
filter_updates: self.filter_updates.clone(),
reporef: self.reporef.clone(),
name: name.into(),
}
}

fn add_child_with_id(
&mut self,
name: impl Into<String>,
id: gix::progress::Id,
) -> Self::SubProgress {
GitSync {
max: self.max.clone(),
cnt: self.cnt.clone(),
progress: self.progress.clone(),
filter_updates: self.filter_updates.clone(),
reporef: self.reporef.clone(),
name: name.into(),
id,
}
}
}
5 changes: 4 additions & 1 deletion server/bleep/src/background/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ impl SyncHandle {
let mut loop_counter = 0;
let loop_max = 1;
let git_err = loop {
match creds.git_sync(&self.reporef, repo.clone()).await {
match creds
.git_sync(&self.reporef, repo.clone(), &self.pipes)
.await
{
Err(
err @ RemoteError::GitCloneFetch(gix::clone::fetch::Error::PrepareFetch(
gix::remote::fetch::prepare::Error::RefMap(
Expand Down
36 changes: 25 additions & 11 deletions server/bleep/src/remotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
use tracing::{error, warn};

use crate::{
background::SyncPipes,
remotes,
repo::{Backend, RepoError, RepoRef, Repository, SyncStatus},
Application,
Expand Down Expand Up @@ -128,9 +129,15 @@ macro_rules! creds_callback(($auth:ident) => {{
}
}});

async fn git_clone(auth: Option<GitCreds>, url: &str, target: &Path) -> Result<()> {
async fn git_clone(
auth: Option<GitCreds>,
url: &str,
target: &Path,
pipes: &SyncPipes,
) -> Result<()> {
let url = url.to_owned();
let target = target.to_owned();
let git_status = pipes.git_sync_progress();

tokio::task::spawn_blocking(move || {
let mut clone = {
Expand All @@ -144,15 +151,17 @@ async fn git_clone(auth: Option<GitCreds>, url: &str, target: &Path) -> Result<(
}
};

let (_repo, _outcome) = clone.fetch_only(gix::progress::Discard, &false.into())?;
let (_repo, _outcome) = clone.fetch_only(git_status, &false.into())?;
Ok(())
})
.await?
}

async fn git_pull(auth: Option<GitCreds>, repo: &Repository) -> Result<()> {
async fn git_pull(auth: Option<GitCreds>, repo: &Repository, pipes: &SyncPipes) -> Result<()> {
use gix::remote::Direction;

let git_status = pipes.git_sync_progress();

let disk_path = repo.disk_path.to_owned();
tokio::task::spawn_blocking(move || {
let repo = gix::open(disk_path)?;
Expand All @@ -169,8 +178,8 @@ async fn git_pull(auth: Option<GitCreds>, repo: &Repository) -> Result<()> {
};

connection
.prepare_fetch(gix::progress::Discard, Default::default())?
.receive(gix::progress::Discard, &false.into())?;
.prepare_fetch(git_status.clone(), Default::default())?
.receive(git_status, &false.into())?;

Ok(())
})
Expand Down Expand Up @@ -338,25 +347,30 @@ pub(crate) enum BackendCredential {

impl BackendCredential {
#[tracing::instrument(fields(repo=%reporef), skip_all)]
pub(crate) async fn git_sync(&self, reporef: &RepoRef, repo: Repository) -> Result<SyncStatus> {
pub(crate) async fn git_sync(
&self,
reporef: &RepoRef,
repo: Repository,
pipes: &SyncPipes,
) -> Result<SyncStatus> {
use BackendCredential::*;
let Github(gh) = self;

let synced = if repo.last_index_unix_secs == 0 && repo.disk_path.exists() {
// it is possible syncing was killed, but the repo is
// intact. pull if the dir exists, then quietly revert
// to cloning if that fails
if let Ok(success) = gh.auth.pull_repo(&repo).await {
if let Ok(success) = gh.auth.pull_repo(&repo, pipes).await {
Ok(success)
} else {
gh.auth.clone_repo(&repo).await
gh.auth.clone_repo(&repo, pipes).await
}
} else if repo.last_index_unix_secs == 0 {
gh.auth.clone_repo(&repo).await
gh.auth.clone_repo(&repo, pipes).await
} else {
let pulled = gh.auth.pull_repo(&repo).await;
let pulled = gh.auth.pull_repo(&repo, pipes).await;
if pulled.is_err() {
gh.auth.clone_repo(&repo).await
gh.auth.clone_repo(&repo, pipes).await
} else {
pulled
}
Expand Down
8 changes: 4 additions & 4 deletions server/bleep/src/remotes/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ impl From<Auth> for State {
}

impl Auth {
pub(crate) async fn clone_repo(&self, repo: &Repository) -> Result<()> {
pub(crate) async fn clone_repo(&self, repo: &Repository, pipes: &SyncPipes) -> Result<()> {
let creds = self.creds_for_private_repos(repo).await?;
git_clone(creds, &repo.remote.to_string(), &repo.disk_path).await
git_clone(creds, &repo.remote.to_string(), &repo.disk_path, pipes).await
}

pub(crate) async fn pull_repo(&self, repo: &Repository) -> Result<()> {
pub(crate) async fn pull_repo(&self, repo: &Repository, pipes: &SyncPipes) -> Result<()> {
let creds = self.creds_for_private_repos(repo).await?;
git_pull(creds, repo).await
git_pull(creds, repo, pipes).await
}

async fn creds_for_private_repos(&self, repo: &Repository) -> Result<Option<GitCreds>> {
Expand Down

0 comments on commit 57dbcea

Please sign in to comment.