Skip to content

Commit

Permalink
Grace period (#25)
Browse files Browse the repository at this point in the history
* Flush on alias.

* Add grace period.

* Remove cuckoofilter.

* Use u64.
  • Loading branch information
dvc94ch authored Nov 24, 2020
1 parent 41da123 commit d752faf
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 57 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ repository = "https://github.com/ipfs-rust/ipfs-embed"
[dependencies]
anyhow = "1.0.34"
async-std = { version = "1.7.0", features = ["unstable"] }
cuckoofilter = "0.5.0"
fnv = "1.0.7"
futures = "0.3.8"
ipfs-embed-core = { version = "0.9.0", path = "../core" }
Expand Down
49 changes: 33 additions & 16 deletions db/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,19 @@ where
}

/// Returns an iterator of `Id`s sorted by least recently used.
pub fn lru(&self) -> impl Iterator<Item = Result<Id>> {
self.lru
.iter()
.values()
.map(|v| v.map(Into::into).map_err(Into::into))
pub fn lru(&self) -> impl Iterator<Item = Result<(u64, Id)>> {
self.lru.iter().map(|res| {
res.map(|(atime, id)| (u64::from(&Id::from(atime)), id.into()))
.map_err(Into::into)
})
}

/// Returns the current atime.
pub fn atime(&self) -> u64 {
match self.lru.last() {
Ok(Some((atime, _))) => u64::from(&Id::from(atime)),
_ => 0,
}
}

/// Returns the data of a block and increments the access time.
Expand All @@ -152,6 +160,7 @@ where
}
tlru.insert(&atime, &id)?;
tatime.insert(&id, &atime)?;
log::debug!("get {} at {}", id, atime);
Ok(())
})
.map_err(map_tx_error)?;
Expand All @@ -167,10 +176,10 @@ where
pub fn insert(&self, block: &Block<S>) -> Result<()> {
let cid = IVec::from(block.cid().to_bytes());
let data = block.data();
let id = (&self.lookup, &self.cid, &self.data, &self.atime, &self.lru)
(&self.lookup, &self.cid, &self.data, &self.atime, &self.lru)
.transaction(|(tlookup, tcid, tdata, tatime, tlru)| {
if let Some(id) = tlookup.get(&cid)? {
return Ok(Id::from(id));
if tlookup.get(&cid)?.is_some() {
return Ok(());
}
let id: Id = tlookup.generate_id()?.into();
let atime: Id = tlru.generate_id()?.into();
Expand All @@ -179,10 +188,10 @@ where
tdata.insert(&id, data)?;
tatime.insert(&id, &atime)?;
tlru.insert(&atime, &id)?;
Ok(id)
log::debug!("insert {} at {}", id, atime);
Ok(())
})
.map_err(map_tx_error)?;
log::debug!("insert {}", id);
Ok(())
}

Expand Down Expand Up @@ -281,7 +290,7 @@ where
for res in alias.iter().values() {
let id = res?;
for id in Ids::from(closure.get(&id)?.unwrap()).iter() {
filter.add(&id)?;
filter.add(&id);
}
}
Ok(Self {
Expand Down Expand Up @@ -339,7 +348,7 @@ where
}
}
for id in closure.iter() {
filter.add(&id).unwrap();
filter.add(&id);
}
for id in prev_closure.iter() {
filter.delete(&id);
Expand All @@ -360,7 +369,7 @@ where

if res.is_err() {
for id in prev_closure.iter() {
filter.add(&id).unwrap();
filter.add(&id);
}
for id in closure.iter() {
filter.delete(&id);
Expand Down Expand Up @@ -391,9 +400,14 @@ where
}
}

/// Returns the current atime.
pub fn atime(&self) -> u64 {
self.blocks.atime()
}

/// Evicts least recently used blocks until there are no more
/// than `cache_size` number of unpinned blocks.
pub async fn evict(&self, cache_size: usize) -> Result<()> {
pub async fn evict(&self, cache_size: usize, grace_atime: u64) -> Result<()> {
let filter = self.filter.lock().await;
let nblocks = self.blocks.len();
let nlive = filter.len();
Expand All @@ -402,12 +416,15 @@ where
return Ok(());
}
let mut nevict = ncache - cache_size;
log::debug!("evicting {} blocks", nevict);
log::debug!("evicting {} blocks older than {}", nevict, grace_atime);
for res in self.blocks.lru() {
if nevict < 1 {
break;
}
let id = res?;
let (atime, id) = res?;
if atime >= grace_atime {
return Ok(());
}
if !filter.contains(&id) {
self.blocks.remove(&id)?;
self.closure.remove(&id)?;
Expand Down
34 changes: 14 additions & 20 deletions db/src/id.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use cuckoofilter::CuckooFilter;
use fnv::FnvHasher;
use ipfs_embed_core::Result;
use fnv::FnvHashMap;
use sled::IVec;
use std::collections::HashSet;
use std::hash::{BuildHasherDefault, Hasher};
Expand Down Expand Up @@ -136,38 +134,34 @@ impl<'a> Iterator for IdsIter<'a> {
}

pub struct LiveSet {
filter: CuckooFilter<FnvHasher>,
distinct: usize,
filter: FnvHashMap<Id, u64>,
}

impl LiveSet {
pub fn new() -> Self {
Self {
filter: CuckooFilter::with_capacity(cuckoofilter::DEFAULT_CAPACITY),
distinct: 0,
filter: Default::default(),
}
}

pub fn len(&self) -> usize {
self.distinct
self.filter.len()
}

pub fn contains(&self, id: &Id) -> bool {
self.filter.contains(id)
self.filter.contains_key(id)
}

pub fn add(&mut self, id: &Id) -> Result<()> {
if !self.filter.contains(id) {
self.distinct += 1;
}
self.filter.add(id)?;
Ok(())
pub fn add(&mut self, id: &Id) {
let count = self.filter.entry(id.clone()).or_default();
*count += 1;
}

pub fn delete(&mut self, id: &Id) {
self.filter.delete(id);
if !self.filter.contains(id) {
self.distinct -= 1;
if let Some((id, count)) = self.filter.remove_entry(id) {
if count > 1 {
self.filter.insert(id, count - 1);
}
}
}
}
Expand All @@ -180,8 +174,8 @@ mod tests {
fn test_live_set() {
let mut live = LiveSet::new();
let id = Id::from(0);
live.add(&id).unwrap();
live.add(&id).unwrap();
live.add(&id);
live.add(&id);
live.delete(&id);
assert_eq!(live.contains(&id), true);
live.delete(&id);
Expand Down
47 changes: 39 additions & 8 deletions db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod blocks;
mod id;

pub struct StorageService<S: StoreParams> {
db: sled::Db,
store: Aliases<S>,
cache_size: usize,
}
Expand All @@ -28,16 +29,32 @@ where
let store = Aliases::open(&db)?;
let gc = store.clone();
task::spawn(async move {
let mut atime = gc.atime();
let mut stream = interval(sweep_interval);
while let Some(()) = stream.next().await {
gc.evict(cache_size).await.ok();
let next_atime = gc.atime();
gc.evict(cache_size, atime).await.ok();
atime = next_atime;
}
});
Ok(Self { cache_size, store })
Ok(Self {
db,
cache_size,
store,
})
}

pub async fn evict(&self) -> Result<()> {
self.store.evict(self.cache_size).await
pub fn atime(&self) -> u64 {
self.store.atime()
}

pub async fn evict(&self, grace_atime: u64) -> Result<()> {
self.store.evict(self.cache_size, grace_atime).await
}

pub async fn flush(&self) -> Result<()> {
self.db.flush_async().await?;
Ok(())
}
}

Expand All @@ -61,7 +78,8 @@ where
}

async fn alias<T: AsRef<[u8]> + Send + Sync>(&self, alias: T, cid: Option<&Cid>) -> Result<()> {
self.store.alias(alias.as_ref(), cid).await
self.store.alias(alias.as_ref(), cid).await?;
self.flush().await
}

fn resolve<T: AsRef<[u8]> + Send + Sync>(&self, alias: T) -> Result<Option<Cid>> {
Expand Down Expand Up @@ -120,22 +138,35 @@ mod tests {
];
store.insert(&blocks[0]).unwrap();
store.insert(&blocks[1]).unwrap();
store.evict().await.unwrap();
store.evict(store.atime() + 1).await.unwrap();
assert_unpinned!(&store, &blocks[0]);
assert_unpinned!(&store, &blocks[1]);
store.insert(&blocks[2]).unwrap();
store.evict().await.unwrap();
store.evict(store.atime() + 1).await.unwrap();
assert_evicted!(&store, &blocks[0]);
assert_unpinned!(&store, &blocks[1]);
assert_unpinned!(&store, &blocks[2]);
store.get(&blocks[1]).unwrap();
store.insert(&blocks[3]).unwrap();
store.evict().await.unwrap();
store.evict(store.atime() + 1).await.unwrap();
assert_unpinned!(&store, &blocks[1]);
assert_evicted!(&store, &blocks[2]);
assert_unpinned!(&store, &blocks[3]);
}

#[async_std::test]
async fn test_grace_period() {
env_logger::try_init().ok();
let config = sled::Config::new().temporary(true);
let store = StorageService::open(&config, 0, Duration::from_millis(10000)).unwrap();
let blocks = [create_block(&ipld!(0))];
store.insert(&blocks[0]).unwrap();
store.evict(0).await.unwrap();
assert_unpinned!(&store, &blocks[0]);
store.evict(store.atime() + 1).await.unwrap();
assert_evicted!(&store, &blocks[0]);
}

#[async_std::test]
#[allow(clippy::many_single_char_names)]
async fn test_store_unpin() {
Expand Down

0 comments on commit d752faf

Please sign in to comment.