Skip to content

Commit

Permalink
Fix issue preventing reads after flush on a file handle (awslabs#751)
Browse files Browse the repository at this point in the history
* Add test to reproduce bad descriptor issue

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Fix issue preventing reads after flush on a file handle

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Initialize file handle type at open

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Ignore overwrite after read test

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Add entry in CHANGELOG

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Raise logging of file handle type choice to debug

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Fix panic message in read test

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Ensure consistent behavior in read test

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Add test for write-only handle opening empty file and closing it

Signed-off-by: Daniel Carl Jones <djonesoa@amazon.com>

* Add changelog entry refering to eager file handle initialization

Signed-off-by: Daniel Carl Jones <djonesoa@amazon.com>

---------

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Daniel Carl Jones <djonesoa@amazon.com>
Co-authored-by: Daniel Carl Jones <djonesoa@amazon.com>
  • Loading branch information
passaro and dannycjones authored Feb 16, 2024
1 parent 77ee71d commit dd901f3
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 284 deletions.
4 changes: 4 additions & 0 deletions mountpoint-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Unreleased

### Other changes
* Fix an issue where read file handles could be closed too early, leading to bad file descriptor errors on subsequent reads. As a consequence of this fix, opening an existing file to overwrite it **immediately** after closing a read file handle may occasionally fail with an "Operation not permitted" error. In such cases, Mountpoint logs will also report that the file is "not writable while being read". ([#751](https://github.com/awslabs/mountpoint-s3/pull/751))
* File handles are no longer initialized lazily. Lazy initialization was introduced in version 1.4.0 but is reverted in this change. If upgrading from 1.4.0, you may see errors that were previously deferred until read/write now raised at open time. ([#751](https://github.com/awslabs/mountpoint-s3/pull/751))

## v1.4.0 (January 26, 2024)

### New features
Expand Down
157 changes: 28 additions & 129 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,10 @@ where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
/// A state where the file handle is created but the type is not yet determined
Created { lookup: LookedUp, flags: i32, pid: u32 },
/// The file handle has been assigned as a read handle
Read {
request: Prefetcher::PrefetchResult<Client>,
pid: u32,
},
Read(Prefetcher::PrefetchResult<Client>),
/// The file handle has been assigned as a write handle
Write(UploadState<Client>),
/// The file handle is already closed, currently only used to tell that the read is finished
Closed,
}

impl<Client, Prefetcher> std::fmt::Debug for FileHandleState<Client, Prefetcher>
Expand All @@ -86,15 +79,8 @@ where
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FileHandleState::Created { lookup, flags, pid } => f
.debug_struct("Created")
.field("lookup", lookup)
.field("flags", flags)
.field("pid", pid)
.finish(),
FileHandleState::Read { request: _, pid } => f.debug_struct("Read").field("pid", pid).finish(),
FileHandleState::Read(_) => f.debug_struct("Read").finish(),
FileHandleState::Write(arg0) => f.debug_tuple("Write").field(arg0).finish(),
FileHandleState::Closed => f.debug_struct("Closed").finish(),
}
}
}
Expand All @@ -104,22 +90,13 @@ where
Client: ObjectClient + Send + Sync,
Prefetcher: Prefetch,
{
async fn new(lookup: LookedUp, flags: i32, pid: u32) -> Self {
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
FileHandleState::Created { lookup, flags, pid }
}

async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleState<Client, Prefetcher>, Error> {
if flags & libc::O_ACCMODE == libc::O_RDONLY {
return Err(err!(libc::EBADF, "file handle is not open for writes"));
}

let is_truncate = flags & libc::O_TRUNC != 0;
let handle = fs
.superblock
Expand All @@ -146,13 +123,8 @@ where

async fn new_read_handle(
lookup: &LookedUp,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleState<Client, Prefetcher>, Error> {
if flags & libc::O_WRONLY != 0 {
return Err(err!(libc::EBADF, "file handle is not open for reads",));
}
if !lookup.stat.is_readable {
return Err(err!(
libc::EACCES,
Expand All @@ -169,7 +141,7 @@ where
let request = fs
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let handle = FileHandleState::Read { request, pid };
let handle = FileHandleState::Read(request);
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(handle)
}
Expand Down Expand Up @@ -726,10 +698,29 @@ where
return Err(err!(libc::EINVAL, "O_SYNC and O_DSYNC are not supported"));
}

// All file handles will be lazy initialized on first read/write.
let state = FileHandleState::new(lookup, flags, pid).await.into();
let state = if flags & libc::O_RDWR != 0 {
let is_truncate = flags & libc::O_TRUNC != 0;
if !remote_file || (self.config.allow_overwrite && is_truncate) {
// If the file is new or opened in truncate mode, we know it must be a write handle.
debug!("fs:open choosing write handle for O_RDWR");
FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?
} else {
// Otherwise, it must be a read handle.
debug!("fs:open choosing read handle for O_RDWR");
FileHandleState::new_read_handle(&lookup, self).await?
}
} else if flags & libc::O_WRONLY != 0 {
FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?
} else {
FileHandleState::new_read_handle(&lookup, self).await?
};

let fh = self.next_handle();
let handle = FileHandle { inode, full_key, state };
let handle = FileHandle {
inode,
full_key,
state: AsyncMutex::new(state),
};
debug!(fh, ino, "new file handle created");
self.file_handles.write().await.insert(fh, Arc::new(handle));

Expand Down Expand Up @@ -766,19 +757,8 @@ where
logging::record_name(handle.inode.name());
let mut state = handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid, .. } => {
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");

*state = FileHandleState::new_read_handle(lookup, *flags, *pid, self).await?;
if let FileHandleState::Read { request, .. } = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
}
FileHandleState::Read { request, .. } => request,
FileHandleState::Read(request) => request,
FileHandleState::Write(_) => return Err(err!(libc::EBADF, "file handle is not open for reads")),
FileHandleState::Closed => return Err(err!(libc::EBADF, "file handle is already closed")),
};

match request.read(offset as u64, size as usize).await {
Expand Down Expand Up @@ -870,18 +850,8 @@ where
let len = {
let mut state = handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid } => {
*state = FileHandleState::new_write_handle(lookup, ino, *flags, *pid, self).await?;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
if let FileHandleState::Write(request) = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
}
FileHandleState::Read { .. } => return Err(err!(libc::EBADF, "file handle is not open for writes")),
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Err(err!(libc::EBADF, "file handle is already closed")),
};

request.write(offset, data, &handle.full_key).await?
Expand Down Expand Up @@ -1097,32 +1067,8 @@ where
logging::record_name(file_handle.inode.name());
let mut state = file_handle.state.lock().await;
let request = match &mut *state {
FileHandleState::Created { lookup, flags, pid } => {
// This happens when users call fsync without any read() or write() requests,
// since we don't know what type of handle it would be we need to consider what
// to do next for both cases.
// * if the file is new or opened in truncate mode, we know it must be a write
// handle so we can start an upload and complete it immediately, result in an
// empty file.
// * if the file already exists and it is not opened in truncate mode, we still
// can't be sure of its type so we will do nothing and just return ok.
let is_new_file = !lookup.inode.is_remote()?;
let is_truncate = *flags & libc::O_TRUNC != 0;
if is_new_file || is_truncate {
*state = FileHandleState::new_write_handle(lookup, lookup.inode.ino(), *flags, *pid, self).await?;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
if let FileHandleState::Write(request) = &mut *state {
request
} else {
unreachable!("handle type always be assigned above");
}
} else {
return Ok(());
}
}
FileHandleState::Read { .. } => return Ok(()),
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Ok(()),
};
self.complete_upload(request, &file_handle.full_key, false, None).await
}
Expand All @@ -1141,10 +1087,6 @@ where
// process. In many cases, the child will then immediately close (flush) the duplicated
// file descriptors. We will not complete the upload if we can detect that the process
// invoking flush is different from the one that originally opened the file.
//
// The same for read path. We want to stop the prefetcher and decrease the reader count
// as soon as users close a file descriptor so that we don't block users from doing other
// operation like overwrite the file.
let file_handle = {
let file_handles = self.file_handles.read().await;
match file_handles.get(&fh) {
Expand All @@ -1155,30 +1097,11 @@ where
logging::record_name(file_handle.inode.name());
let mut state = file_handle.state.lock().await;
match &mut *state {
FileHandleState::Created { .. } => Ok(()),
FileHandleState::Read { pid: open_pid, .. } => {
if !are_from_same_process(*open_pid, pid) {
trace!(
file_handle.full_key,
pid,
open_pid,
"not stopping prefetch because current pid differs from pid at open"
);
return Ok(());
}
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
file_handle.inode.finish_reading()?;

// Mark the file handle state as closed so we only update the reader count once
*state = FileHandleState::Closed;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
Ok(())
}
FileHandleState::Read { .. } => Ok(()),
FileHandleState::Write(request) => {
self.complete_upload(request, &file_handle.full_key, true, Some(pid))
.await
}
FileHandleState::Closed => Ok(()),
}
}

Expand Down Expand Up @@ -1210,38 +1133,14 @@ where
}
};

let mut state = file_handle.state.into_inner();
let request = match state {
FileHandleState::Created { lookup, flags, pid } => {
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "unassigned");
// This happens when release is called before any read() or write(),
// since we don't know what type of handle it would be we need to consider
// what to do next for both cases.
// * if the file is new or opened in truncate mode, we know it must be a write
// handle so we can start an upload from here.
// * if the file already exists and it is not opened in truncate mode, we still
// can't be sure of its type so we will just drop it.
let is_new_file = !lookup.inode.is_remote()?;
let is_truncate = flags & libc::O_TRUNC != 0;
if is_new_file || is_truncate {
state = FileHandleState::new_write_handle(&lookup, lookup.inode.ino(), flags, pid, self).await?;
if let FileHandleState::Write(request) = state {
request
} else {
unreachable!("handle type always be assigned above");
}
} else {
return Ok(());
}
}
let request = match file_handle.state.into_inner() {
FileHandleState::Read { .. } => {
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
file_handle.inode.finish_reading()?;
return Ok(());
}
FileHandleState::Write(request) => request,
FileHandleState::Closed => return Ok(()),
};

let result = request.complete_if_in_progress(&file_handle.full_key).await;
Expand Down
48 changes: 9 additions & 39 deletions mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,8 @@ async fn test_sequential_write(write_size: usize) {
let file_ino = dentry.attr.ino;

// First let's check that we can't write it again
let fh = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.unwrap()
.fh;
let result = fs
.write(file_ino, fh, offset, &[0xaa; 27], 0, 0, None)
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect_err("file should not be overwritable")
.to_errno();
Expand Down Expand Up @@ -700,22 +695,13 @@ async fn test_duplicate_write_fails() {
assert_eq!(dentry.attr.size, 0);
let file_ino = dentry.attr.ino;

let opened = fs
let _opened = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.unwrap();
_ = fs
.write(file_ino, opened.fh, 0, &[0xaa; 27], 0, 0, None)
.await
.expect("first write should succeed");

let opened = fs
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect("open should succeed");
// Should not be allowed to write the file a second time
let err = fs
.write(file_ino, opened.fh, 0, &[0xaa; 27], 0, 0, None)
.open(file_ino, libc::S_IFREG as i32 | libc::O_WRONLY, 0)
.await
.expect_err("should not be able to write twice")
.to_errno();
Expand Down Expand Up @@ -1248,20 +1234,12 @@ async fn test_flexible_retrieval_objects() {
let getattr = fs.getattr(entry.ino).await.unwrap();
assert_eq!(flexible_retrieval, getattr.attr.perm == 0);

let open = fs
.open(entry.ino, libc::O_RDONLY, 0)
.await
.expect("open should succeed");
let read_result = fs.read(entry.ino, open.fh, 0, 4096, 0, None).await;
let open = fs.open(entry.ino, libc::O_RDONLY, 0).await;
if flexible_retrieval {
let err = read_result.expect_err("can't read flexible retrieval objects");
let err = open.expect_err("can't open flexible retrieval objects");
assert_eq!(err.to_errno(), libc::EACCES);
} else {
assert_eq!(
&read_result.unwrap()[..],
b"hello world",
"instant retrieval files are readable"
);
let open = open.expect("instant retrieval files are readable");
fs.release(entry.ino, open.fh, 0, None, true).await.unwrap();
}
}
Expand All @@ -1287,20 +1265,12 @@ async fn test_flexible_retrieval_objects() {
let getattr = fs.getattr(lookup.attr.ino).await.unwrap();
assert_eq!(flexible_retrieval, getattr.attr.perm == 0);

let open = fs
.open(lookup.attr.ino, libc::O_RDONLY, 0)
.await
.expect("open should succeed");
let read_result = fs.read(lookup.attr.ino, open.fh, 0, 4096, 0, None).await;
let open = fs.open(lookup.attr.ino, libc::O_RDONLY, 0).await;
if flexible_retrieval {
let err = read_result.expect_err("can't read flexible retrieval objects");
let err = open.expect_err("can't open flexible retrieval objects");
assert_eq!(err.to_errno(), libc::EACCES);
} else {
assert_eq!(
&read_result.unwrap()[..],
b"hello world",
"instant retrieval files are readable"
);
let open = open.expect("instant retrieval files are readable");
fs.release(lookup.attr.ino, open.fh, 0, None, true).await.unwrap();
}
}
Expand Down
Loading

0 comments on commit dd901f3

Please sign in to comment.