Skip to content

Commit

Permalink
Implement SledStorage transaction timeout support, (gluesql#297)
Browse files Browse the repository at this point in the history
Now SledStorage users can set transaction timeout.
Transaction terminates when either ROLLBACK | COMMIT are executed by users or transaction time limit has exceeded.
  • Loading branch information
panarch authored Aug 20, 2021
1 parent 603ee49 commit bfb89fb
Show file tree
Hide file tree
Showing 11 changed files with 937 additions and 337 deletions.
115 changes: 77 additions & 38 deletions src/storages/sled_storage/alter_table.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,33 @@
#![cfg(feature = "alter-table")]

use {
super::{error::err_into, error::StorageError, fetch_schema, key, lock, SledStorage, Snapshot},
super::{
error::err_into,
fetch_schema, key,
lock::{self, LockAcquired},
transaction::TxPayload,
SledStorage, Snapshot,
},
crate::{
ast::ColumnDef, executor::evaluate_stateless, schema::ColumnDefExt, utils::Vector,
AlterTable, AlterTableError, MutResult, Result, Row, Schema, Value,
},
async_trait::async_trait,
sled::transaction::{ConflictableTransactionError, TransactionError},
sled::transaction::ConflictableTransactionError,
std::{iter::once, str},
};

macro_rules! try_self {
($self: expr, $expr: expr) => {
match $expr {
Err(e) => {
return Err(($self, e.into()));
return Err(($self, e));
}
Ok(v) => v,
}
};
}

macro_rules! transaction {
($self: expr, $expr: expr) => {{
let result = $self.tree.transaction($expr).map_err(|e| match e {
TransactionError::Abort(e) => e,
TransactionError::Storage(e) => StorageError::Sled(e).into(),
});

match result {
Ok(_) => Ok(($self, ())),
Err(e) => Err(($self, e)),
}
}};
}

#[async_trait(?Send)]
impl AlterTable for SledStorage {
async fn rename_schema(self, table_name: &str, new_table_name: &str) -> MutResult<Self, ()> {
Expand All @@ -48,9 +40,14 @@ impl AlterTable for SledStorage {
let items = try_self!(self, items);

let state = &self.state;

transaction!(self, move |tree| {
let (txid, autocommit) = lock::acquire(tree, state)?;
let tx_timeout = self.tx_timeout;
let tx_result = self.tree.transaction(move |tree| {
let (txid, autocommit) = match lock::acquire(tree, state, tx_timeout)? {
LockAcquired::Success { txid, autocommit } => (txid, autocommit),
LockAcquired::RollbackAndRetry { lock_txid } => {
return Ok(TxPayload::RollbackAndRetry(lock_txid));
}
};

let (old_schema_key, schema_snapshot) = fetch_schema(tree, table_name)?;
let schema_snapshot = schema_snapshot
Expand Down Expand Up @@ -134,8 +131,14 @@ impl AlterTable for SledStorage {
tree.insert(temp_new_key, new_schema_key.as_bytes())?;
}

Ok(())
})
Ok(TxPayload::Success)
});

match self.check_retry(tx_result) {
Ok(true) => self.rename_schema(table_name, new_table_name).await,
Ok(false) => Ok((self, ())),
Err(e) => Err((self, e)),
}
}

async fn rename_column(
Expand All @@ -145,9 +148,14 @@ impl AlterTable for SledStorage {
new_column_name: &str,
) -> MutResult<Self, ()> {
let state = &self.state;

transaction!(self, move |tree| {
let (txid, autocommit) = lock::acquire(tree, state)?;
let tx_timeout = self.tx_timeout;
let tx_result = self.tree.transaction(move |tree| {
let (txid, autocommit) = match lock::acquire(tree, state, tx_timeout)? {
LockAcquired::Success { txid, autocommit } => (txid, autocommit),
LockAcquired::RollbackAndRetry { lock_txid } => {
return Ok(TxPayload::RollbackAndRetry(lock_txid));
}
};

let (schema_key, snapshot) = fetch_schema(tree, table_name)?;
let snapshot = snapshot
Expand Down Expand Up @@ -197,8 +205,17 @@ impl AlterTable for SledStorage {
tree.insert(temp_key, schema_key.as_bytes())?;
}

Ok(())
})
Ok(TxPayload::Success)
});

match self.check_retry(tx_result) {
Ok(true) => {
self.rename_column(table_name, old_column_name, new_column_name)
.await
}
Ok(false) => Ok((self, ())),
Err(e) => Err((self, e)),
}
}

async fn add_column(self, table_name: &str, column_def: &ColumnDef) -> MutResult<Self, ()> {
Expand All @@ -211,9 +228,14 @@ impl AlterTable for SledStorage {
let items = try_self!(self, items);

let state = &self.state;

transaction!(self, move |tree| {
let (txid, autocommit) = lock::acquire(tree, state)?;
let tx_timeout = self.tx_timeout;
let tx_result = self.tree.transaction(move |tree| {
let (txid, autocommit) = match lock::acquire(tree, state, tx_timeout)? {
LockAcquired::Success { txid, autocommit } => (txid, autocommit),
LockAcquired::RollbackAndRetry { lock_txid } => {
return Ok(TxPayload::RollbackAndRetry(lock_txid));
}
};

let (schema_key, schema_snapshot) = fetch_schema(tree, table_name)?;
let schema_snapshot = schema_snapshot
Expand Down Expand Up @@ -309,8 +331,14 @@ impl AlterTable for SledStorage {
tree.insert(temp_key, schema_key.as_bytes())?;
}

Ok(())
})
Ok(TxPayload::Success)
});

match self.check_retry(tx_result) {
Ok(true) => self.add_column(table_name, column_def).await,
Ok(false) => Ok((self, ())),
Err(e) => Err((self, e)),
}
}

async fn drop_column(
Expand All @@ -328,9 +356,14 @@ impl AlterTable for SledStorage {
let items = try_self!(self, items);

let state = &self.state;

transaction!(self, move |tree| {
let (txid, autocommit) = lock::acquire(tree, state)?;
let tx_timeout = self.tx_timeout;
let tx_result = self.tree.transaction(move |tree| {
let (txid, autocommit) = match lock::acquire(tree, state, tx_timeout)? {
LockAcquired::Success { txid, autocommit } => (txid, autocommit),
LockAcquired::RollbackAndRetry { lock_txid } => {
return Ok(TxPayload::RollbackAndRetry(lock_txid));
}
};

let (schema_key, schema_snapshot) = fetch_schema(tree, table_name)?;
let schema_snapshot = schema_snapshot
Expand All @@ -352,7 +385,7 @@ impl AlterTable for SledStorage {
let column_index = match (column_index, if_exists) {
(Some(index), _) => index,
(None, true) => {
return Ok(());
return Ok(TxPayload::Success);
}
(None, false) => {
return Err(
Expand Down Expand Up @@ -418,7 +451,13 @@ impl AlterTable for SledStorage {
tree.insert(temp_key, schema_key.as_bytes())?;
}

Ok(())
})
Ok(TxPayload::Success)
});

match self.check_retry(tx_result) {
Ok(true) => self.drop_column(table_name, column_name, if_exists).await,
Ok(false) => Ok((self, ())),
Err(e) => Err((self, e)),
}
}
}
27 changes: 16 additions & 11 deletions src/storages/sled_storage/error.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use std::str;
use thiserror::Error as ThisError;
use {
crate::{Error, IndexError},
sled::transaction::TransactionError as SledTransactionError,
std::{str, time},
thiserror::Error as ThisError,
};

#[cfg(feature = "alter-table")]
use crate::AlterTableError;

#[cfg(feature = "index")]
use crate::IndexError;

use crate::Error;

#[derive(ThisError, Debug)]
pub enum StorageError {
#[cfg(feature = "alter-table")]
#[error(transparent)]
AlterTable(#[from] AlterTableError),

#[cfg(feature = "index")]
#[error(transparent)]
Index(#[from] IndexError),

Expand All @@ -25,6 +22,8 @@ pub enum StorageError {
Bincode(#[from] bincode::Error),
#[error(transparent)]
Str(#[from] str::Utf8Error),
#[error(transparent)]
SystemTime(#[from] time::SystemTimeError),
}

impl From<StorageError> for Error {
Expand All @@ -35,11 +34,10 @@ impl From<StorageError> for Error {
Sled(e) => Error::Storage(Box::new(e)),
Bincode(e) => Error::Storage(e),
Str(e) => Error::Storage(Box::new(e)),
SystemTime(e) => Error::Storage(Box::new(e)),

#[cfg(feature = "alter-table")]
AlterTable(e) => e.into(),

#[cfg(feature = "index")]
Index(e) => e.into(),
}
}
Expand All @@ -54,3 +52,10 @@ where

e
}

pub fn tx_err_into(e: SledTransactionError<Error>) -> Error {
match e {
SledTransactionError::Abort(e) => e,
SledTransactionError::Storage(e) => StorageError::Sled(e).into(),
}
}
45 changes: 29 additions & 16 deletions src/storages/sled_storage/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@ use {
data::{Row, Schema},
result::Result,
},
std::time::{SystemTime, UNIX_EPOCH},
};

impl SledStorage {
pub fn gc(&self) -> Result<()> {
let mut lock: Lock = self
.tree
.get("lock/")
.map_err(err_into)?
.map(|l| bincode::deserialize(&l))
.transpose()
.map_err(err_into)?
.unwrap_or_default();

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(err_into)?
.as_millis();

let txids = self
.tree
.scan_prefix("tx_data/")
Expand All @@ -20,9 +35,19 @@ impl SledStorage {
.map_err(err_into)?
.map_err(err_into)
})
.take_while(|tx_data| match tx_data {
Ok(TxData { alive, .. }) => !alive,
Err(_) => false,
.take_while(|tx_data| match (tx_data, self.tx_timeout) {
(Ok(TxData { alive, .. }), None) => !alive,
(Ok(tx_data), Some(tx_timeout)) => {
let TxData {
txid,
alive,
created_at,
} = tx_data;

(!alive || now - created_at >= tx_timeout)
&& Some(txid) != lock.lock_txid.as_ref()
}
(Err(_), _) => false,
})
.map(|tx_data| tx_data.map(|TxData { txid, .. }| txid))
.collect::<Result<Vec<u64>>>()?;
Expand All @@ -34,19 +59,7 @@ impl SledStorage {
}
};

let Lock { lock_txid, .. } = self
.tree
.get("lock/")
.map_err(err_into)?
.map(|l| bincode::deserialize(&l))
.transpose()
.map_err(err_into)?
.unwrap_or_default();

let lock = Lock {
lock_txid,
gc_txid: Some(*max_txid),
};
lock.gc_txid = Some(*max_txid);

bincode::serialize(&lock)
.map(|lock| self.tree.insert("lock/", lock))
Expand Down
8 changes: 5 additions & 3 deletions src/storages/sled_storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ impl Index<IVec> for SledStorage {
}
};

let txid = match self.state {
State::Transaction { txid, .. } => txid,
let (txid, created_at) = match self.state {
State::Transaction {
txid, created_at, ..
} => (txid, created_at),
State::Idle => {
return Err(Error::StorageMsg(
"conflict - scan_indexed_data failed, lock does not exist".to_owned(),
));
}
};
let lock_txid = lock::fetch(&self.tree, txid)?;
let lock_txid = lock::fetch(&self.tree, txid, created_at, self.tx_timeout)?;

let tree = self.tree.clone();
let flat_map = move |keys: Result<IVec>| {
Expand Down
Loading

0 comments on commit bfb89fb

Please sign in to comment.