Skip to content

Commit

Permalink
fix(aws): deal with closed connections via retries (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi authored Nov 27, 2024
1 parent 0bd4de5 commit bbda2c4
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration_test_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ jobs:
name: db-dump-${{ inputs.test_name }}
path: dump.sql.gz
- name: Dump docker logs on failure
if: failure()
if: always()
uses: jwalton/gh-docker-logs@v2
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ tower-http = { version = "^0.6", features = [
] }
tracing = { version = "^0.1", features = ["attributes"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
tryhard = { version = "0.5.1" }
urlencoding = "^2.1"
async-stream = "0.3.6"
utoipa = { version = "4.2.3", features = [
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg-catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ tower-http = { workspace = true, optional = true, features = [
"cors",
] }
tracing = { workspace = true }
tryhard = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
utoipa = { workspace = true, features = ["uuid"] }
Expand Down
107 changes: 60 additions & 47 deletions crates/iceberg-catalog/src/catalog/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::compression_codec::CompressionCodec;
use crate::api::{ErrorModel, Result};
use crate::retry::retry_fn;
use crate::service::storage::path_utils;
use futures::stream::BoxStream;
use futures::StreamExt;
Expand All @@ -7,8 +9,6 @@ use iceberg_ext::catalog::rest::IcebergErrorResponse;
use iceberg_ext::configs::Location;
use serde::Serialize;

use super::compression_codec::CompressionCodec;

pub(crate) async fn write_metadata_file(
metadata_location: &Location,
metadata: impl Serialize,
Expand All @@ -27,23 +27,17 @@ pub(crate) async fn write_metadata_file(
.new_output(metadata_location)
.map_err(IoError::FileCreation)?;

let mut writer = metadata_file
.writer()
.await
.map_err(IoError::FileWriterCreation)?;

let buf = serde_json::to_vec(&metadata).map_err(IoError::Serialization)?;

let metadata_bytes = compression_codec.compress(&buf[..])?;

writer
.write(metadata_bytes.into())
.await
.map_err(|e| IoError::FileWrite(Box::new(e)))?;

writer.close().await.map_err(IoError::FileClose)?;

Ok(())
retry_fn(|| async {
metadata_file
.write(metadata_bytes.clone().into())
.await
.map_err(IoError::FileWriterCreation)
})
.await
}

pub(crate) async fn delete_file(file_io: &FileIO, location: &Location) -> Result<(), IoError> {
Expand All @@ -54,12 +48,14 @@ pub(crate) async fn delete_file(file_io: &FileIO, location: &Location) -> Result
location.to_string()
};

file_io
.delete(location)
.await
.map_err(IoError::FileDelete)?;

Ok(())
retry_fn(|| async {
file_io
.clone()
.delete(location.clone())
.await
.map_err(IoError::FileDelete)
})
.await
}

pub(crate) async fn read_file(file_io: &FileIO, file: &Location) -> Result<Vec<u8>, IoError> {
Expand All @@ -70,11 +66,18 @@ pub(crate) async fn read_file(file_io: &FileIO, file: &Location) -> Result<Vec<u
file.to_string()
};

let inp = file_io.new_input(file).map_err(IoError::FileCreation)?;
inp.read()
.await
.map_err(|e| IoError::FileRead(Box::new(e)))
.map(|r| r.to_vec())
retry_fn(|| async {
// InputFile isn't clone hence it's here
file_io
.clone()
.new_input(file.clone())
.map_err(IoError::FileCreation)?
.read()
.await
.map_err(|e| IoError::FileRead(Box::new(e)))
.map(Into::into)
})
.await
}

pub(crate) async fn remove_all(file_io: &FileIO, location: &Location) -> Result<(), IoError> {
Expand All @@ -85,12 +88,14 @@ pub(crate) async fn remove_all(file_io: &FileIO, location: &Location) -> Result<
location.to_string()
};

file_io
.remove_all(location)
.await
.map_err(IoError::FileRemoveAll)?;

Ok(())
retry_fn(|| async {
file_io
.clone()
.remove_all(location.clone())
.await
.map_err(IoError::FileRemoveAll)
})
.await
}

pub(crate) const DEFAULT_LIST_LOCATION_PAGE_SIZE: usize = 1000;
Expand All @@ -102,21 +107,29 @@ pub(crate) async fn list_location<'a>(
) -> Result<BoxStream<'a, std::result::Result<Vec<String>, IoError>>, IoError> {
let location = path_utils::reduce_scheme_string(location.as_str(), false);
tracing::debug!("Listing location: {}", location);
let entries = file_io
.list_paginated(
format!("{}/", location.trim_end_matches('/')).as_str(),
true,
page_size.unwrap_or(DEFAULT_LIST_LOCATION_PAGE_SIZE),
)
.await
.map_err(IoError::List)?
.map(|res| match res {
Ok(entries) => Ok(entries
.into_iter()
.map(|it| it.path().to_string())
.collect()),
Err(e) => Err(IoError::List(e)),
});
let location = format!("{}/", location.trim_end_matches('/'));
let size = page_size.unwrap_or(DEFAULT_LIST_LOCATION_PAGE_SIZE);

let entries = retry_fn(|| async {
file_io
.list_paginated(location.clone().as_str(), true, size)
.await
.map_err(|e| {
tracing::warn!(
?e,
"Failed to list files in location, gonna retry three times.."
);
IoError::List(e)
})
})
.await?
.map(|res| match res {
Ok(entries) => Ok(entries
.into_iter()
.map(|it| it.path().to_string())
.collect()),
Err(e) => Err(IoError::List(e)),
});
Ok(entries.boxed())
}

Expand Down
51 changes: 36 additions & 15 deletions crates/iceberg-catalog/src/catalog/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use crate::request_metadata::RequestMetadata;
use crate::service::authz::{CatalogNamespaceAction, CatalogTableAction, CatalogWarehouseAction};
use crate::service::contract_verification::{ContractVerification, ContractVerificationOutcome};
use crate::service::event_publisher::{CloudEventsPublisher, EventMetadata};
use crate::service::storage::{StorageLocations as _, StoragePermissions, StorageProfile};
use crate::service::storage::{
StorageLocations as _, StoragePermissions, StorageProfile, ValidationError,
};
use crate::service::task_queue::tabular_expiration_queue::TabularExpirationInput;
use crate::service::task_queue::tabular_purge_queue::TabularPurgeInput;
use crate::service::TabularIdentUuid;
Expand All @@ -36,6 +38,7 @@ use std::str::FromStr as _;

use crate::catalog;
use crate::catalog::tabular::list_entities;
use crate::retry::retry_fn;
use http::StatusCode;
use iceberg::spec::{
FormatVersion, MetadataLog, SchemaId, SortOrder, TableMetadata, TableMetadataBuildResult,
Expand Down Expand Up @@ -222,20 +225,38 @@ impl<C: Catalog, A: Authorizer + Clone, S: SecretStore>
};

let file_io = storage_profile.file_io(storage_secret.as_ref())?;

crate::service::storage::check_location_is_empty(
&file_io,
&table_location,
storage_profile,
|| crate::service::storage::ValidationError::InvalidLocation {
reason: "Unexpected files in location, tabular locations have to be empty"
.to_string(),
location: table_location.to_string(),
source: None,
storage_type: storage_profile.storage_type(),
},
)
.await?;
retry_fn(|| async {
match crate::service::storage::check_location_is_empty(
&file_io,
&table_location,
storage_profile,
|| crate::service::storage::ValidationError::InvalidLocation {
reason: "Unexpected files in location, tabular locations have to be empty"
.to_string(),
location: table_location.to_string(),
source: None,
storage_type: storage_profile.storage_type(),
},
)
.await
{
Err(e @ ValidationError::IoOperationFailed(_, _)) => {
tracing::warn!(
"Error while checking location is empty: {e}, retrying up to three times.."
);
Err(e)
}
Ok(()) => {
tracing::debug!("Location is empty");
Ok(Ok(()))
}
Err(other) => {
tracing::error!("Unrecoverable error: {other:?}");
Ok(Err(other))
}
}
})
.await??;

if let Some(metadata_location) = &metadata_location {
let compression_codec = CompressionCodec::try_from_metadata(&table_metadata)?;
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg-catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod api;

#[cfg(feature = "router")]
pub mod metrics;
mod retry;
#[cfg(feature = "router")]
pub(crate) mod tracing;

Expand Down
12 changes: 12 additions & 0 deletions crates/iceberg-catalog/src/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::time::Duration;

pub(crate) async fn retry_fn<T, Z, E>(f: impl Fn() -> T) -> crate::api::Result<Z, E>
where
T: std::future::Future<Output = crate::api::Result<Z, E>>,
{
tryhard::retry_fn(f)
.retries(3)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1))
.await
}
41 changes: 32 additions & 9 deletions crates/iceberg-catalog/src/service/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use iceberg_ext::configs::table::TableProperties;
use iceberg_ext::configs::Location;
pub use s3::{S3Credential, S3Flavor, S3Location, S3Profile};

use crate::retry::retry_fn;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -324,16 +325,34 @@ impl StorageProfile {
.map_err(|e| ValidationError::IoOperationFailed(e, Box::new(self.clone())))?;

tracing::info!("Cleanup finished");

check_location_is_empty(&file_io, &test_location, self, || {
ValidationError::InvalidLocation {
reason: "Files are left after remove_all on test location".to_string(),
source: None,
location: test_location.to_string(),
storage_type: self.storage_type(),
retry_fn(|| async {
match check_location_is_empty(&file_io, &test_location, self, || {
ValidationError::InvalidLocation {
reason: "Files are left after remove_all on test location".to_string(),
source: None,
location: test_location.to_string(),
storage_type: self.storage_type(),
}
})
.await
{
Err(e @ ValidationError::IoOperationFailed(_, _)) => {
tracing::warn!(
"Error while checking location is empty: {e}, retrying up to three times.."
);
Err(e)
}
Ok(()) => {
tracing::debug!("Location is empty");
Ok(Ok(()))
}
Err(other) => {
tracing::error!("Unrecoverable error: {other:?}");
Ok(Err(other))
}
}
})
.await?;
.await??;
tracing::info!("checked location is empty");
Ok(())
}
Expand Down Expand Up @@ -648,9 +667,13 @@ pub(crate) async fn check_location_is_empty(

let mut entry_stream = list_location(file_io, location, Some(1))
.await
.map_err(|e| ValidationError::IoOperationFailed(e, Box::new(storage_profile.clone())))?;
.map_err(|e| {
tracing::warn!("Initing list location failed: {e}");
ValidationError::IoOperationFailed(e, Box::new(storage_profile.clone()))
})?;
while let Some(entries) = entry_stream.next().await {
let entries = entries.map_err(|e| {
tracing::warn!("Stream batch failed: {e}");
ValidationError::IoOperationFailed(e, Box::new(storage_profile.clone()))
})?;

Expand Down
8 changes: 1 addition & 7 deletions crates/iceberg-catalog/src/service/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;
use veil::Redact;

use super::StorageType;

static S3_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(|| {
reqwest::ClientBuilder::new()
.pool_idle_timeout(Duration::from_millis(18500))
.build()
.expect("This should never fail since we are just setting timeout to 18500 which does not populate config.error")
});
static S3_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);

#[derive(Debug, Eq, Clone, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "kebab-case")]
Expand Down

0 comments on commit bbda2c4

Please sign in to comment.