Skip to content

Commit

Permalink
Implement insert batch.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed May 22, 2020
1 parent 6dd6c40 commit 1df50f0
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
15 changes: 15 additions & 0 deletions examples/ipld_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use ipfs_embed::{Config, Store};
use ipld_collections::List;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::from_path("/tmp/list")?;
let store = Store::new(config)?;

let mut list = List::new(store, 64, 256).await?;
// push: 1024xi128; n: 4; width: 256; size: 4096
for i in 0..1024 {
list.push(i as i64).await?;
}
Ok(())
}
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ pub enum Error {
Ipld(#[from] IpldError),
#[error(transparent)]
Io(#[from] IoError),
#[error("empty batch")]
EmptyBatch,
}

impl From<Error> for StoreError {
fn from(error: Error) -> Self {
if let Error::EmptyBatch = error {
return Self::EmptyBatch;
}
Self::Other(Box::new(error))
}
}
54 changes: 40 additions & 14 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,61 @@ impl Storage {

pub fn insert(&self, cid: &Cid, data: IVec, visibility: Visibility) -> Result<(), Error> {
log::trace!("insert {}", cid.to_string());
let ipld = libipld::block::decode_ipld(cid, &data)?;
let refs = libipld::block::references(&ipld);
let refs_bytes = encode_refs(&refs);
self.insert_batch(std::iter::once((cid.clone(), data)), visibility)?;
Ok(())
}

pub fn insert_batch(
&self,
batch: impl Iterator<Item = (Cid, IVec)>,
visibility: Visibility,
) -> Result<Cid, Error> {
log::trace!("insert_batch");
let blocks: Result<Vec<(Cid, IVec, HashSet<Cid>, IVec)>, Error> = batch.into_iter()
.map(|(cid, data)| {
let ipld = libipld::block::decode_ipld(&cid, &data)?;
let refs = libipld::block::references(&ipld);
let encoded = encode_refs(&refs);
Ok((cid, data, refs, encoded))
})
.collect();
let blocks = blocks?;
if blocks.is_empty() {
return Err(Error::EmptyBatch);
}
self.tree.transaction(|tree| {
log::trace!("insert key {:?}", Key::block(cid));
let pin_key = Key::pin(cid);
if let Some(pin) = tree.get(&pin_key)? {
log::trace!("duplicate incrementing pin count");
tree.insert(pin_key, &[pin[0] + 1])?;
} else {
for cid in &refs {
let mut last_cid = None;
for (cid, data, refs, encoded_refs) in &blocks {
last_cid = Some(cid);
if tree.get(Key::block(cid))?.is_some() {
continue;
}
for cid in refs {
let refer_key = Key::refer(cid);
let refer = tree
.get(refer_key.clone())?
.map(|buf| buf[0])
.unwrap_or_default();
tree.insert(refer_key, &[refer + 1])?;
}
tree.insert(Key::block(cid), &*data)?;
tree.insert(pin_key, &[1])?;
tree.insert(Key::refs(cid), &refs_bytes)?;
tree.insert(Key::block(cid), data)?;
tree.insert(Key::refs(cid), encoded_refs)?;
if let Visibility::Public = visibility {
tree.insert(Key::public(cid), &[])?;
}
tree.remove(Key::want(cid))?;
}
let last_cid = last_cid.unwrap();
let pin_key = Key::pin(last_cid);
if let Some(pin) = tree.get(&pin_key)? {
log::trace!("duplicate incrementing pin count");
tree.insert(pin_key, &[pin[0] + 1])?;
} else {
tree.insert(pin_key, &[1])?;
}
Ok(())
})?;
Ok(())
Ok(blocks.into_iter().last().map(|(cid, _, _, _)| cid).unwrap())
}

pub async fn flush(&self) -> Result<(), Error> {
Expand Down
3 changes: 2 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ impl WritableStore for Store {
batch: Vec<Block>,
visibility: Visibility,
) -> StoreResult<'a, Cid> {
todo!()
let batch = batch.into_iter().map(|Block { cid, data }| (cid, data.into()));
Box::pin(async move { Ok(self.storage.insert_batch(batch, visibility)?) })
}

fn flush(&self) -> StoreResult<'_, ()> {
Expand Down

0 comments on commit 1df50f0

Please sign in to comment.