diff --git a/Cargo.lock b/Cargo.lock index 2e81a78..ad7fbae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3164,7 +3164,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.7.0" +version = "0.7.1" dependencies = [ "actix-cors", "actix-files", diff --git a/Cargo.toml b/Cargo.toml index 7f355e9..20cf736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.7.0" +version = "0.7.1" edition = "2021" description = "TUS protocol implementation written in Rust." keywords = ["tus", "server", "actix-web"] diff --git a/docs/configuration.md b/docs/configuration.md index 22e8ef7..05ee32f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -264,14 +264,17 @@ Redis db is a good way to store information. you must provide connection string for master Redis server. Since rustus need to have latest information and it writes a lot. -`--info-db-dsn` - connection string for your Redis database. -It's required if `redis-info-storage` is chosen. +Configuration parameters: +* `--info-db-dsn` - connection string for your Redis database. + It's required if `redis-info-storage` is chosen. +* `--redis-info-expiration` - number of seconds when key will expire. === "CLI" ``` bash rustus --info-storage "redis-info-storage" \ - --info-db-dsn "redis://localhost/0" + --info-db-dsn "redis://localhost/0" \ + --redis-info-expiration 100 ``` === "ENV" @@ -279,6 +282,7 @@ It's required if `redis-info-storage` is chosen. ``` bash export RUSTUS_INFO_STORAGE="redis-info-storage" export RUSTUS_INFO_DB_DSN="redis://localhost" + export RUSTUS_REDIS_INFO_EXPIRATION="100" rustus ``` diff --git a/docs/hooks.md b/docs/hooks.md index d89ad3d..8bc552b 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -894,7 +894,11 @@ Configuration parameters: * `--hooks-amqp-durable-exchange` - adds durability to created exchange; * `--hooks-amqp-durable-queues` - adds durability to created; * `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html); -* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange. +* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange; +* `--hooks-amqp-connection-pool-size` - maximum number of opened connections to RabbitMQ; +* `--hooks-amqp-channel-pool-size` - maximum number of opened channels for each connection to RabbitMQ; +* `--hooks-amqp-idle-connection-timeout` - timeout for idle connection in seconds. If the connection isn't used, it's dropped; +* `--hooks-amqp-idle-channels-timeout` - timeout for idle channels in seconds. If the channel isn't used, it's dropped. If no hooks_amqp_routing_key specified, rustus will send all messages with different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. @@ -917,7 +921,11 @@ Otherwise, it will use only one routing key and only one queue! --hooks-amqp-declare-queues \ --hooks-amqp-durable-exchange \ --hooks-amqp-durable-queues \ - --hooks-amqp-celery + --hooks-amqp-celery \ + --hooks-amqp-connection-pool-size 10 \ + --hooks-amqp-channel-pool-size 10 \ + --hooks-amqp-idle-connection-timeout 20 \ + --hooks-amqp-idle-channels-timeout 10 ``` === "ENV" @@ -933,6 +941,10 @@ Otherwise, it will use only one routing key and only one queue! export RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE="true" export RUSTUS_HOOKS_AMQP_DURABLE_QUEUES="true" export RUSTUS_HOOKS_AMQP_CELERY="true" + export RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE="10" + export RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE="10" + export RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT="20" + export RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT="10" rustus ``` diff --git a/src/config.rs b/src/config.rs index 4481369..99ae95b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -146,44 +146,15 @@ pub struct InfoStoreOptions { env = "RUSTUS_INFO_DB_DSN" )] pub info_db_dsn: Option, -} + #[cfg(feature = "redis_info_storage")] + #[arg(long, env = "RUSTUS_REDIS_INFO_EXPIRATION")] + pub redis_info_expiration: Option, +} #[derive(Parser, Debug, Clone)] #[allow(clippy::struct_excessive_bools)] -pub struct NotificationsOptions { - /// Notifications format. - /// - /// This format will be used in all - /// messages about hooks. - #[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")] - pub hooks_format: Format, - - /// Enabled hooks for notifications. - #[arg( - long, - default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish", - env = "RUSTUS_HOOKS", - use_value_delimiter = true - )] - pub hooks: Vec, - - /// Use this option if you use rustus - /// behind any proxy. Like Nginx or Traefik. - #[arg(long, env = "RUSTUS_BEHIND_PROXY")] - pub behind_proxy: bool, - - /// List of URLS to send webhooks to. - #[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)] - pub hooks_http_urls: Vec, - - // List of headers to forward from client. - #[arg( - long, - env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", - use_value_delimiter = true - )] - pub hooks_http_proxy_headers: Vec, +pub struct AMQPHooksOptions { /// Url for AMQP server. #[cfg(feature = "amqp_notifier")] #[arg(long, env = "RUSTUS_HOOKS_AMQP_URL")] @@ -239,6 +210,71 @@ pub struct NotificationsOptions { )] pub hooks_amqp_queues_prefix: String, + /// Maximum number of connections for RabbitMQ. + #[cfg(feature = "amqp_notifier")] + #[arg( + long, + env = "RUSTUS_HOOKS_AMQP_CONNECTION_POOL_SIZE", + default_value = "10" + )] + pub hooks_amqp_connection_pool_size: u32, + + /// Maximum number of opened channels for each connection. + #[cfg(feature = "amqp_notifier")] + #[arg( + long, + env = "RUSTUS_HOOKS_AMQP_CHANNEL_POOL_SIZE", + default_value = "10" + )] + pub hooks_amqp_channel_pool_size: u32, + + /// After this amount of time the connection will be dropped. + #[cfg(feature = "amqp_notifier")] + #[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CONNECTION_TIMEOUT")] + pub hooks_amqp_idle_connection_timeout: Option, + + /// After this amount of time in seconds, the channel will be closed. + #[cfg(feature = "amqp_notifier")] + #[arg(long, env = "RUSTUS_HOOKS_AMQP_IDLE_CHANNELS_TIMEOUT")] + pub hooks_amqp_idle_channels_timeout: Option, +} + +#[derive(Parser, Debug, Clone)] +#[allow(clippy::struct_excessive_bools)] +pub struct NotificationsOptions { + /// Notifications format. + /// + /// This format will be used in all + /// messages about hooks. + #[arg(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")] + pub hooks_format: Format, + + /// Enabled hooks for notifications. + #[arg( + long, + default_value = "pre-create,post-create,post-receive,pre-terminate,post-terminate,post-finish", + env = "RUSTUS_HOOKS", + use_value_delimiter = true + )] + pub hooks: Vec, + + /// Use this option if you use rustus + /// behind any proxy. Like Nginx or Traefik. + #[arg(long, env = "RUSTUS_BEHIND_PROXY")] + pub behind_proxy: bool, + + /// List of URLS to send webhooks to. + #[arg(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_value_delimiter = true)] + pub hooks_http_urls: Vec, + + // List of headers to forward from client. + #[arg( + long, + env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", + use_value_delimiter = true + )] + pub hooks_http_proxy_headers: Vec, + /// Directory for executable hook files. /// This parameter is used to call executables from dir. #[arg(long, env = "RUSTUS_HOOKS_DIR")] @@ -248,6 +284,9 @@ pub struct NotificationsOptions { /// notifying about upload status. #[arg(long, env = "RUSTUS_HOOKS_FILE")] pub hooks_file: Option, + + #[command(flatten)] + pub amqp_hook_opts: AMQPHooksOptions, } #[derive(Debug, Parser, Clone)] diff --git a/src/info_storages/models/available_info_storages.rs b/src/info_storages/models/available_info_storages.rs index cae48d7..4e83c1c 100644 --- a/src/info_storages/models/available_info_storages.rs +++ b/src/info_storages/models/available_info_storages.rs @@ -61,6 +61,7 @@ impl AvailableInfoStores { .clone() .unwrap() .as_str(), + config.info_storage_opts.redis_info_expiration, ) .await?, )), diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs index 4e7b946..9539df1 100644 --- a/src/info_storages/redis_info_storage.rs +++ b/src/info_storages/redis_info_storage.rs @@ -14,14 +14,15 @@ use crate::{ #[derive(Clone)] pub struct RedisStorage { pool: Pool, + expiration: Option, } impl RedisStorage { #[allow(clippy::unused_async)] - pub async fn new(db_dsn: &str) -> RustusResult { + pub async fn new(db_dsn: &str, expiration: Option) -> RustusResult { let manager = RedisConnectionManager::new(db_dsn)?; let pool = bb8::Pool::builder().max_size(100).build(manager).await?; - Ok(Self { pool }) + Ok(Self { pool, expiration }) } } @@ -33,10 +34,14 @@ impl InfoStorage for RedisStorage { async fn set_info(&self, file_info: &FileInfo, _create: bool) -> RustusResult<()> { let mut conn = self.pool.get().await?; - redis::cmd("SET") + let mut cmd = redis::cmd("SET"); + let mut cmd = cmd .arg(file_info.id.as_str()) - .arg(file_info.json().await?.as_str()) - .query_async::(&mut conn) + .arg(file_info.json().await?.as_str()); + if let Some(expiration) = self.expiration.as_ref() { + cmd = cmd.arg("EX").arg(expiration); + } + cmd.query_async::(&mut conn) .await .map_err(RustusError::from)?; Ok(()) @@ -76,7 +81,7 @@ mod tests { async fn get_storage() -> RedisStorage { let redis_url = std::env::var("TEST_REDIS_URL").unwrap(); - RedisStorage::new(redis_url.as_str()).await.unwrap() + RedisStorage::new(redis_url.as_str(), None).await.unwrap() } async fn get_redis() -> redis::aio::Connection { @@ -103,7 +108,9 @@ mod tests { #[actix_rt::test] async fn no_connection() { - let info_storage = RedisStorage::new("redis://unknonwn_url/0").await.unwrap(); + let info_storage = RedisStorage::new("redis://unknonwn_url/0", None) + .await + .unwrap(); let file_info = FileInfo::new_test(); let res = info_storage.set_info(&file_info, true).await; assert!(res.is_err()); @@ -119,22 +126,22 @@ mod tests { } #[actix_rt::test] - async fn deletion_success() { + async fn expiration() { let info_storage = get_storage().await; - let mut redis = get_redis().await; - let res = info_storage.remove_info("unknown").await; + let res = info_storage + .get_info(uuid::Uuid::new_v4().to_string().as_str()) + .await; assert!(res.is_err()); + } + + #[actix_rt::test] + async fn deletion_success() { + let mut info_storage = get_storage().await; + info_storage.expiration = Some(1); + let mut redis = get_redis().await; let file_info = FileInfo::new_test(); info_storage.set_info(&file_info, true).await.unwrap(); - assert!(redis - .get::<&str, Option>(file_info.id.as_str()) - .await - .unwrap() - .is_some()); - info_storage - .remove_info(file_info.id.as_str()) - .await - .unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; assert!(redis .get::<&str, Option>(file_info.id.as_str()) .await diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index 8a08aa3..4272b29 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -1,4 +1,5 @@ use crate::{ + config::AMQPHooksOptions, notifiers::{Hook, Notifier}, RustusResult, }; @@ -9,8 +10,9 @@ use bb8_lapin::LapinConnectionManager; use lapin::{ options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions}, types::{AMQPValue, FieldTable, LongString}, - BasicProperties, ConnectionProperties, ExchangeKind, + BasicProperties, ChannelState, ConnectionProperties, ExchangeKind, }; +use std::time::Duration; use strum::IntoEnumIterator; #[allow(clippy::struct_excessive_bools)] @@ -25,7 +27,7 @@ pub struct DeclareOptions { #[derive(Clone)] pub struct AMQPNotifier { exchange_name: String, - pool: Pool, + channel_pool: Pool, queues_prefix: String, exchange_kind: String, routing_key: Option, @@ -33,28 +35,99 @@ pub struct AMQPNotifier { celery: bool, } +/// Channel manager for lapin channels. +/// +/// This manager is used with bb8 pool, +/// so it maintains opened channels for every connections. +/// +/// This pool uses connection pool +/// and issues new connections from it. +#[derive(Clone)] +pub struct ChannelPool { + pool: Pool, +} + +impl ChannelPool { + pub fn new(pool: Pool) -> Self { + ChannelPool { pool } + } +} + +/// ManagerConnection for ChannelPool. +/// +/// This manager helps you maintain opened channels. +#[async_trait::async_trait] +impl bb8::ManageConnection for ChannelPool { + type Connection = lapin::Channel; + type Error = lapin::Error; + + async fn connect(&self) -> Result { + Ok(self + .pool + .get() + .await + .map_err(|err| match err { + bb8::RunError::TimedOut => lapin::Error::ChannelsLimitReached, + bb8::RunError::User(user_err) => user_err, + })? + .create_channel() + .await?) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let valid_states = vec![ChannelState::Initial, ChannelState::Connected]; + if valid_states.contains(&conn.status().state()) { + Ok(()) + } else { + Err(lapin::Error::InvalidChannel(conn.id())) + } + } + + fn has_broken(&self, conn: &mut Self::Connection) -> bool { + let broken_states = vec![ChannelState::Closed, ChannelState::Error]; + broken_states.contains(&conn.status().state()) + } +} + impl AMQPNotifier { #[allow(clippy::fn_params_excessive_bools)] - pub async fn new( - amqp_url: &str, - exchange: &str, - queues_prefix: &str, - exchange_kind: &str, - routing_key: Option, - declare_options: DeclareOptions, - celery: bool, - ) -> RustusResult { - let manager = LapinConnectionManager::new(amqp_url, ConnectionProperties::default()); - let pool = bb8::Pool::builder().build(manager).await?; + pub async fn new(options: AMQPHooksOptions) -> RustusResult { + let manager = LapinConnectionManager::new( + options.hooks_amqp_url.unwrap().as_str(), + ConnectionProperties::default(), + ); + let connection_pool = bb8::Pool::builder() + .idle_timeout( + options + .hooks_amqp_idle_connection_timeout + .map(Duration::from_secs), + ) + .max_size(options.hooks_amqp_connection_pool_size) + .build(manager) + .await?; + let channel_pool = bb8::Pool::builder() + .idle_timeout( + options + .hooks_amqp_idle_channels_timeout + .map(Duration::from_secs), + ) + .max_size(options.hooks_amqp_channel_pool_size) + .build(ChannelPool::new(connection_pool)) + .await?; Ok(Self { - pool, - celery, - routing_key, - declare_options, - exchange_kind: exchange_kind.into(), - exchange_name: exchange.into(), - queues_prefix: queues_prefix.into(), + channel_pool, + celery: options.hooks_amqp_celery, + routing_key: options.hooks_amqp_routing_key, + declare_options: DeclareOptions { + declare_exchange: options.hooks_amqp_declare_exchange, + durable_exchange: options.hooks_amqp_durable_exchange, + declare_queues: options.hooks_amqp_declare_queues, + durable_queues: options.hooks_amqp_durable_queues, + }, + exchange_kind: options.hooks_amqp_exchange_kind, + exchange_name: options.hooks_amqp_exchange, + queues_prefix: options.hooks_amqp_queues_prefix, }) } @@ -74,7 +147,7 @@ impl AMQPNotifier { #[async_trait(?Send)] impl Notifier for AMQPNotifier { async fn prepare(&mut self) -> RustusResult<()> { - let chan = self.pool.get().await?.create_channel().await?; + let chan = self.channel_pool.get().await?; if self.declare_options.declare_exchange { chan.exchange_declare( self.exchange_name.as_str(), @@ -118,7 +191,7 @@ impl Notifier for AMQPNotifier { hook: Hook, _header_map: &HeaderMap, ) -> RustusResult<()> { - let chan = self.pool.get().await?.create_channel().await?; + let chan = self.channel_pool.get().await?; let queue = self.get_queue_name(hook); let routing_key = self.routing_key.as_ref().unwrap_or(&queue); let payload = if self.celery { @@ -162,20 +235,22 @@ mod tests { async fn get_notifier() -> AMQPNotifier { let amqp_url = std::env::var("TEST_AMQP_URL").unwrap(); - let mut notifier = AMQPNotifier::new( - amqp_url.as_str(), - uuid::Uuid::new_v4().to_string().as_str(), - uuid::Uuid::new_v4().to_string().as_str(), - "topic", - None, - DeclareOptions { - declare_exchange: true, - declare_queues: true, - durable_queues: false, - durable_exchange: false, - }, - true, - ) + let mut notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions { + hooks_amqp_url: Some(amqp_url), + hooks_amqp_declare_exchange: true, + hooks_amqp_declare_queues: true, + hooks_amqp_durable_exchange: false, + hooks_amqp_durable_queues: false, + hooks_amqp_celery: true, + hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(), + hooks_amqp_exchange_kind: String::from("topic"), + hooks_amqp_routing_key: None, + hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(), + hooks_amqp_connection_pool_size: 1, + hooks_amqp_channel_pool_size: 1, + hooks_amqp_idle_connection_timeout: None, + hooks_amqp_idle_channels_timeout: None, + }) .await .unwrap(); notifier.prepare().await.unwrap(); @@ -191,14 +266,7 @@ mod tests { .send_message(test_msg.clone(), hook.clone(), &HeaderMap::new()) .await .unwrap(); - let chan = notifier - .pool - .get() - .await - .unwrap() - .create_channel() - .await - .unwrap(); + let chan = notifier.channel_pool.get().await.unwrap(); let message = chan .basic_get( format!("{}.{}", notifier.queues_prefix.as_str(), hook).as_str(), @@ -220,20 +288,22 @@ mod tests { #[actix_rt::test] async fn unknown_url() { - let notifier = AMQPNotifier::new( - "http://unknown", - "test", - "test", - "topic", - None, - DeclareOptions { - declare_exchange: false, - declare_queues: false, - durable_queues: false, - durable_exchange: false, - }, - false, - ) + let notifier = AMQPNotifier::new(crate::config::AMQPHooksOptions { + hooks_amqp_url: Some(String::from("http://unknown")), + hooks_amqp_declare_exchange: true, + hooks_amqp_declare_queues: true, + hooks_amqp_durable_exchange: false, + hooks_amqp_durable_queues: false, + hooks_amqp_celery: false, + hooks_amqp_exchange: uuid::Uuid::new_v4().to_string(), + hooks_amqp_exchange_kind: String::from("topic"), + hooks_amqp_routing_key: None, + hooks_amqp_queues_prefix: uuid::Uuid::new_v4().to_string(), + hooks_amqp_connection_pool_size: 1, + hooks_amqp_channel_pool_size: 1, + hooks_amqp_idle_connection_timeout: None, + hooks_amqp_idle_channels_timeout: None, + }) .await .unwrap(); let res = notifier diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 5326250..c9f449c 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -46,39 +46,16 @@ impl NotificationManager { ))); } #[cfg(feature = "amqp_notifier")] - if rustus_config.notification_opts.hooks_amqp_url.is_some() { + if rustus_config + .notification_opts + .amqp_hook_opts + .hooks_amqp_url + .is_some() + { debug!("Found AMQP notifier."); manager.notifiers.push(Box::new( amqp_notifier::AMQPNotifier::new( - rustus_config - .notification_opts - .hooks_amqp_url - .as_ref() - .unwrap(), - rustus_config.notification_opts.hooks_amqp_exchange.as_str(), - rustus_config - .notification_opts - .hooks_amqp_queues_prefix - .as_str(), - rustus_config - .notification_opts - .hooks_amqp_exchange_kind - .as_str(), - rustus_config - .notification_opts - .hooks_amqp_routing_key - .clone(), - amqp_notifier::DeclareOptions { - declare_exchange: rustus_config - .notification_opts - .hooks_amqp_declare_exchange, - declare_queues: rustus_config.notification_opts.hooks_amqp_declare_queues, - durable_exchange: rustus_config - .notification_opts - .hooks_amqp_durable_exchange, - durable_queues: rustus_config.notification_opts.hooks_amqp_durable_queues, - }, - rustus_config.notification_opts.hooks_amqp_celery, + rustus_config.notification_opts.amqp_hook_opts.clone(), ) .await?, ));