Skip to content

Commit

Permalink
Add initial async implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrosland committed Nov 30, 2019
1 parent d7dbcb1 commit 6eebf84
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 37 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ mmal-sys = "0.1.0-3"
libc = "0.2"
parking_lot = "0.10"
scopeguard = "1.0"
futures = "0.3"

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }

[features]
default = []
Expand Down
38 changes: 38 additions & 0 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use rascam::*;
use std::time::Duration;
use tokio::fs::File;
use tokio::prelude::*;
use tokio::time::delay_for;

#[tokio::main]
async fn main() {
let info = info().unwrap();
if info.cameras.len() < 1 {
println!("Found 0 cameras. Exiting");
// note that this doesn't run destructors
::std::process::exit(1);
}
println!("{}", info);

let result = simple_async(&info.cameras[0]).await;
match result {
Ok(_) => println!("Saved image as image.jpg"),
Err(err) => {
println!("error: {}", err);
::std::process::exit(1);
}
}
}

async fn simple_async(info: &CameraInfo) -> Result<(), Box<dyn std::error::Error>> {
let mut camera = SimpleCamera::new(info.clone())?;
camera.activate()?;

delay_for(Duration::from_millis(2000)).await;

let b = camera.take_one_async().await?;
let mut file = File::create("image.jpg").await?;
file.write_all(&b).await?;

Ok(())
}
177 changes: 140 additions & 37 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use mmal_sys as ffi;
#[macro_use(defer_on_unwind)]
extern crate scopeguard;
use ffi::MMAL_STATUS_T;
use futures::future::FutureExt;
use futures::stream::StreamExt;
use parking_lot::{lock_api::RawMutex, Mutex};
use std::ffi::CStr;
use std::io::Write;
Expand Down Expand Up @@ -50,12 +52,20 @@ pub use ffi::MMAL_ENCODING_OPAQUE;

pub use ffi::MMAL_ENCODING_RGB24;

// type Future2 = Box<Future<Item = [u8], Error = ffi::MMAL_STATUS_T::Type>>;

struct Userdata {
pool: NonNull<ffi::MMAL_POOL_T>,
_guard: Arc<Mutex<()>>,
sender: mpsc::SyncSender<Option<BufferGuard>>,
sender: SenderKind,
}

pub enum SenderKind {
SyncSender(mpsc::SyncSender<Option<BufferGuard>>),
AsyncSender(futures::channel::mpsc::Sender<BufferGuard>),
}

enum ReceiverKind {
SyncReceiver(mpsc::Receiver<Option<BufferGuard>>),
AsyncReceiver(futures::channel::mpsc::Receiver<BufferGuard>),
}

/// Guard around a buffer header.
Expand Down Expand Up @@ -308,7 +318,7 @@ impl SeriousCamera {
}
}

pub unsafe fn set_buffer_callback(&mut self, sender: mpsc::SyncSender<Option<BufferGuard>>) {
pub unsafe fn set_buffer_callback(&mut self, sender: SenderKind) {
let port = if self.use_encoder {
(*self.encoder.unwrap().as_ref().output.offset(0))
} else {
Expand All @@ -317,7 +327,7 @@ impl SeriousCamera {

let userdata = Userdata {
pool: self.pool.unwrap(),
sender,
sender: sender,
_guard: Arc::clone(&self.mutex),
};

Expand Down Expand Up @@ -795,7 +805,8 @@ impl SeriousCamera {
fn do_take(
&mut self,
buffer_port_ptr: &mut *mut ffi::MMAL_PORT_T,
) -> Result<mpsc::Receiver<Option<BufferGuard>>, CameraError> {
is_async: bool,
) -> Result<ReceiverKind, CameraError> {
unsafe {
let mut status = ffi::mmal_port_parameter_set_uint32(
self.camera.as_ref().control,
Expand All @@ -815,8 +826,10 @@ impl SeriousCamera {
if !self.encoder_output_port_enabled {
self.enable_encoder_port().unwrap();
}
} else if !self.still_port_enabled {
self.enable_still_port().unwrap();
} else {
if !self.still_port_enabled {
self.enable_still_port().unwrap();
}
}

let output = self.camera.as_ref().output;
Expand All @@ -835,7 +848,19 @@ impl SeriousCamera {
// Send all the buffers to the camera output port
self.send_buffers(*buffer_port_ptr)?;

let (sender, receiver) = mpsc::sync_channel(0);
let (sender, receiver) = if is_async {
let (sender, receiver) = futures::channel::mpsc::channel(0);
(
SenderKind::AsyncSender(sender),
ReceiverKind::AsyncReceiver(receiver),
)
} else {
let (sender, receiver) = mpsc::sync_channel(0);
(
SenderKind::SyncSender(sender),
ReceiverKind::SyncReceiver(receiver),
)
};

self.set_buffer_callback(sender);

Expand Down Expand Up @@ -877,15 +902,52 @@ impl SeriousCamera {
unsafe { mutex.force_unlock() };
}}

self.do_take(&mut buffer_port_ptr).map_err(|e| {
unsafe {
if !buffer_port_ptr.is_null() && !(*buffer_port_ptr).userdata.is_null() {
drop_port_userdata(buffer_port_ptr);
self.do_take(&mut buffer_port_ptr, false)
.map_err(|e| {
unsafe {
if !buffer_port_ptr.is_null() && !(*buffer_port_ptr).userdata.is_null() {
drop_port_userdata(buffer_port_ptr);
}
self.mutex.force_unlock();
}
self.mutex.force_unlock();
}
e
})
e
})
.map(|receiver| match receiver {
ReceiverKind::SyncReceiver(receiver) => receiver,
ReceiverKind::AsyncReceiver(_) => unreachable!(),
})
}

pub fn take_async(
&mut self,
) -> Result<futures::channel::mpsc::Receiver<BufferGuard>, CameraError> {
unsafe {
self.mutex.raw().lock();
}

let mut buffer_port_ptr = ptr::null_mut();
let mutex = Arc::clone(&self.mutex);

defer_on_unwind! {{
unsafe { mutex.force_unlock() };
}}

self.do_take(&mut buffer_port_ptr, true)
.map_err(|e| {
unsafe {
if buffer_port_ptr != ptr::null_mut()
&& (*buffer_port_ptr).userdata != ptr::null_mut()
{
drop_port_userdata(buffer_port_ptr);
}
self.mutex.force_unlock();
}
e
})
.map(|receiver| match receiver {
ReceiverKind::AsyncReceiver(receiver) => receiver,
ReceiverKind::SyncReceiver(_) => unreachable!(),
})
}
}

Expand Down Expand Up @@ -915,20 +977,33 @@ unsafe extern "C" fn camera_buffer_callback(
if bytes_to_write > 0 {
ffi::mmal_buffer_header_mem_lock(buffer);

userdata
.sender
.send(Some(BufferGuard::new(
port,
buffer,
userdata.pool,
complete,
)))
.unwrap();
} else {
if let Err(_err) = userdata.sender.send(None) {
#[cfg(feature = "debug")]
println!("Got err sending None: {}", _err);
match &mut userdata.sender {
SenderKind::AsyncSender(sender) => {
sender
.try_send(BufferGuard::new(port, buffer, userdata.pool, complete))
.unwrap();
}
SenderKind::SyncSender(sender) => {
sender
.send(Some(BufferGuard::new(
port,
buffer,
userdata.pool,
complete,
)))
.unwrap();
}
}
} else {
let _result = match &mut userdata.sender {
SenderKind::AsyncSender(sender) => sender.close_channel(),
SenderKind::SyncSender(sender) => {
if let Err(_err) = sender.send(None) {
#[cfg(feature = "debug")]
println!("Got err sending None: {}", _err);
}
}
};
}
} else {
#[cfg(feature = "debug")]
Expand Down Expand Up @@ -1135,15 +1210,43 @@ impl SimpleCamera {
self.take_one_writer(&mut v)?;
Ok(v)
}

/// Captures a single image from the camera asynchronously.
///
/// Returns a future result where `Ok` contains a `Vec<u8>` containing the bytes of the image.
pub async fn take_one_async(&mut self) -> Result<Vec<u8>, CameraError> {
let receiver = self.serious.take_async()?;
let future = receiver
.fold(Vec::new(), |mut acc, buf| {
async move {
acc.extend(buf.get_bytes());
acc
}
})
.map(Ok);

future.await
}
}

/// Drops a port's userdata.
/// userdata must be non-null or will dereference a null pointer!
pub fn drop_port_userdata(port: *mut ffi::MMAL_PORT_T) {
unsafe {
let userdata: Box<Userdata> = Box::from_raw((*port).userdata as *mut Userdata);
userdata._guard.force_unlock();
drop(userdata);
(*port).userdata = ptr::null_mut() as *mut ffi::MMAL_PORT_USERDATA_T;
///
/// # Safety
///
/// `port.userdata` must be non-null or this will dereference a null pointer.
pub unsafe fn drop_port_userdata(port: *mut ffi::MMAL_PORT_T) {
let userdata: Box<Userdata> = Box::from_raw((*port).userdata as *mut Userdata);
userdata._guard.force_unlock();
drop(userdata);
(*port).userdata = ptr::null_mut() as *mut ffi::MMAL_PORT_USERDATA_T;
}

trait Sender {
fn try_send(&mut self, msg: BufferGuard);
}

impl Sender for futures::channel::mpsc::Sender<BufferGuard> {
fn try_send(&mut self, msg: BufferGuard) {
futures::channel::mpsc::Sender::try_send(self, msg).unwrap()
}
}

0 comments on commit 6eebf84

Please sign in to comment.