Skip to content

Commit

Permalink
Merge branch 'release/0.7.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Mar 29, 2023
2 parents 8437529 + 6ce1757 commit c7e6056
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 148 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rustus"
version = "0.7.0"
version = "0.7.1"
edition = "2021"
description = "TUS protocol implementation written in Rust."
keywords = ["tus", "server", "actix-web"]
Expand Down
10 changes: 7 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,21 +264,25 @@ Redis db is a good way to store information.
you must provide connection string for master Redis server.
Since rustus need to have latest information and it writes a lot.

`--info-db-dsn` - connection string for your Redis database.
It's required if `redis-info-storage` is chosen.
Configuration parameters:
* `--info-db-dsn` - connection string for your Redis database.
It's required if `redis-info-storage` is chosen.
* `--redis-info-expiration` - number of seconds when key will expire.

=== "CLI"

``` bash
rustus --info-storage "redis-info-storage" \
--info-db-dsn "redis://localhost/0"
--info-db-dsn "redis://localhost/0" \
--redis-info-expiration 100
```

=== "ENV"

``` bash
export RUSTUS_INFO_STORAGE="redis-info-storage"
export RUSTUS_INFO_DB_DSN="redis://localhost"
export RUSTUS_REDIS_INFO_EXPIRATION="100"

rustus
```
Expand Down
16 changes: 14 additions & 2 deletions docs/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,11 @@ Configuration parameters:
* `--hooks-amqp-durable-exchange` - adds durability to created exchange;
* `--hooks-amqp-durable-queues` - adds durability to created;
* `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html);
* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange.
* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange;
* `--hooks-amqp-connection-pool-size` - maximum number of opened connections to RabbitMQ;
* `--hooks-amqp-channel-pool-size` - maximum number of opened channels for each connection to RabbitMQ;
* `--hooks-amqp-idle-connection-timeout` - timeout for idle connection in seconds. If the connection isn't used, it's dropped;
* `--hooks-amqp-idle-channels-timeout` - timeout for idle channels in seconds. If the channel isn't used, it's dropped.

If no hooks_amqp_routing_key specified, rustus will send all messages with
different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on.
Expand All @@ -917,7 +921,11 @@ Otherwise, it will use only one routing key and only one queue!
--hooks-amqp-declare-queues \
--hooks-amqp-durable-exchange \
--hooks-amqp-durable-queues \
--hooks-amqp-celery
--hooks-amqp-celery \
--hooks-amqp-connection-pool-size 10 \
--hooks-amqp-channel-pool-size 10 \
--hooks-amqp-idle-connection-timeout 20 \
--hooks-amqp-idle-channels-timeout 10
```

=== "ENV"
Expand All @@ -933,6 +941,10 @@ Otherwise, it will use only one routing key and only one queue!
export RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE="true"
export RUSTUS_HOOKS_AMQP_DURABLE_QUEUES="true"
export RUSTUS_HOOKS_AMQP_CELERY="true"
export RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE="10"
export RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE="10"
export RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT="20"
export RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT="10"

rustus
```
Expand Down
107 changes: 73 additions & 34 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,44 +146,15 @@ pub struct InfoStoreOptions {
env = "RUSTUS_INFO_DB_DSN"
)]
pub info_db_dsn: Option<String>,
}

#[cfg(feature = "redis_info_storage")]
#[arg(long, env = "RUSTUS_REDIS_INFO_EXPIRATION")]
pub redis_info_expiration: Option<usize>,
}
#[derive(Parser, Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct NotificationsOptions {
/// Notifications format.
///
/// This format will be used in all
/// messages about hooks.
#[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")]
pub hooks_format: Format,

/// Enabled hooks for notifications.
#[arg(
long,
default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish",
env = "RUSTUS_HOOKS",
use_value_delimiter = true
)]
pub hooks: Vec<Hook>,

/// Use this option if you use rustus
/// behind any proxy. Like Nginx or Traefik.
#[arg(long, env = "RUSTUS_BEHIND_PROXY")]
pub behind_proxy: bool,

/// List of URLS to send webhooks to.
#[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)]
pub hooks_http_urls: Vec<String>,

// List of headers to forward from client.
#[arg(
long,
env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS",
use_value_delimiter = true
)]
pub hooks_http_proxy_headers: Vec<String>,

pub struct AMQPHooksOptions {
/// Url for AMQP server.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")]
Expand Down Expand Up @@ -239,6 +210,71 @@ pub struct NotificationsOptions {
)]
pub hooks_amqp_queues_prefix: String,

/// Maximum number of connections for RabbitMQ.
#[cfg(feature = "amqp_notifier")]
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_connection_pool_size: u32,

/// Maximum number of opened channels for each connection.
#[cfg(feature = "amqp_notifier")]
#[arg(
long,
env = "RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE",
default_value = "10"
)]
pub hooks_amqp_channel_pool_size: u32,

/// After this amount of time the connection will be dropped.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT")]
pub hooks_amqp_idle_connection_timeout: Option<u64>,

/// After this amount of time in seconds, the channel will be closed.
#[cfg(feature = "amqp_notifier")]
#[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT")]
pub hooks_amqp_idle_channels_timeout: Option<u64>,
}

#[derive(Parser, Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct NotificationsOptions {
/// Notifications format.
///
/// This format will be used in all
/// messages about hooks.
#[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")]
pub hooks_format: Format,

/// Enabled hooks for notifications.
#[arg(
long,
default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish",
env = "RUSTUS_HOOKS",
use_value_delimiter = true
)]
pub hooks: Vec<Hook>,

/// Use this option if you use rustus
/// behind any proxy. Like Nginx or Traefik.
#[arg(long, env = "RUSTUS_BEHIND_PROXY")]
pub behind_proxy: bool,

/// List of URLS to send webhooks to.
#[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)]
pub hooks_http_urls: Vec<String>,

// List of headers to forward from client.
#[arg(
long,
env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS",
use_value_delimiter = true
)]
pub hooks_http_proxy_headers: Vec<String>,

/// Directory for executable hook files.
/// This parameter is used to call executables from dir.
#[arg(long, env = "RUSTUS_HOOKS_DIR")]
Expand All @@ -248,6 +284,9 @@ pub struct NotificationsOptions {
/// notifying about upload status.
#[arg(long, env = "RUSTUS_HOOKS_FILE")]
pub hooks_file: Option<String>,

#[command(flatten)]
pub amqp_hook_opts: AMQPHooksOptions,
}

#[derive(Debug, Parser, Clone)]
Expand Down
1 change: 1 addition & 0 deletions src/info_storages/models/available_info_storages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl AvailableInfoStores {
.clone()
.unwrap()
.as_str(),
config.info_storage_opts.redis_info_expiration,
)
.await?,
)),
Expand Down
45 changes: 26 additions & 19 deletions src/info_storages/redis_info_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use crate::{
#[derive(Clone)]
pub struct RedisStorage {
pool: Pool<RedisConnectionManager>,
expiration: Option<usize>,
}

impl RedisStorage {
#[allow(clippy::unused_async)]
pub async fn new(db_dsn: &str) -> RustusResult<Self> {
pub async fn new(db_dsn: &str, expiration: Option<usize>) -> RustusResult<Self> {
let manager = RedisConnectionManager::new(db_dsn)?;
let pool = bb8::Pool::builder().max_size(100).build(manager).await?;
Ok(Self { pool })
Ok(Self { pool, expiration })
}
}

Expand All @@ -33,10 +34,14 @@ impl InfoStorage for RedisStorage {

async fn set_info(&self, file_info: &FileInfo, _create: bool) -> RustusResult<()> {
let mut conn = self.pool.get().await?;
redis::cmd("SET")
let mut cmd = redis::cmd("SET");
let mut cmd = cmd
.arg(file_info.id.as_str())
.arg(file_info.json().await?.as_str())
.query_async::<Connection, String>(&mut conn)
.arg(file_info.json().await?.as_str());
if let Some(expiration) = self.expiration.as_ref() {
cmd = cmd.arg("EX").arg(expiration);
}
cmd.query_async::<Connection, String>(&mut conn)
.await
.map_err(RustusError::from)?;
Ok(())
Expand Down Expand Up @@ -76,7 +81,7 @@ mod tests {

async fn get_storage() -> RedisStorage {
let redis_url = std::env::var("TEST_REDIS_URL").unwrap();
RedisStorage::new(redis_url.as_str()).await.unwrap()
RedisStorage::new(redis_url.as_str(), None).await.unwrap()
}

async fn get_redis() -> redis::aio::Connection {
Expand All @@ -103,7 +108,9 @@ mod tests {

#[actix_rt::test]
async fn no_connection() {
let info_storage = RedisStorage::new("redis://unknonwn_url/0").await.unwrap();
let info_storage = RedisStorage::new("redis://unknonwn_url/0", None)
.await
.unwrap();
let file_info = FileInfo::new_test();
let res = info_storage.set_info(&file_info, true).await;
assert!(res.is_err());
Expand All @@ -119,22 +126,22 @@ mod tests {
}

#[actix_rt::test]
async fn deletion_success() {
async fn expiration() {
let info_storage = get_storage().await;
let mut redis = get_redis().await;
let res = info_storage.remove_info("unknown").await;
let res = info_storage
.get_info(uuid::Uuid::new_v4().to_string().as_str())
.await;
assert!(res.is_err());
}

#[actix_rt::test]
async fn deletion_success() {
let mut info_storage = get_storage().await;
info_storage.expiration = Some(1);
let mut redis = get_redis().await;
let file_info = FileInfo::new_test();
info_storage.set_info(&file_info, true).await.unwrap();
assert!(redis
.get::<&str, Option<String>>(file_info.id.as_str())
.await
.unwrap()
.is_some());
info_storage
.remove_info(file_info.id.as_str())
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
assert!(redis
.get::<&str, Option<String>>(file_info.id.as_str())
.await
Expand Down
Loading

0 comments on commit c7e6056

Please sign in to comment.