Skip to content

Instantly share code, notes, and snippets.

@colbyhall
Last active April 21, 2021 19:33
Show Gist options
  • Save colbyhall/8b4b36dab6c6ec3b0c43d61fec9371da to your computer and use it in GitHub Desktop.
Save colbyhall/8b4b36dab6c6ec3b0c43d61fec9371da to your computer and use it in GitHub Desktop.
#![feature(trait_alias)]
#![feature(box_syntax)]
use std::thread;
use std::sync::{
Arc,
Mutex,
Condvar,
};
use std::sync::atomic::{ AtomicBool, Ordering };
use std::collections::VecDeque;
pub trait Job = FnOnce() + Send + Sync + 'static;
struct ThreadPoolInner {
queue: Mutex<VecDeque<Box<dyn Job>>>,
on_change: Condvar,
is_running: AtomicBool,
}
#[derive(Clone)]
pub struct ThreadPool(Arc<ThreadPoolInner>);
impl ThreadPool {
pub fn new(num_threads: usize) -> Self {
let state = Arc::new(ThreadPoolInner {
queue: Mutex::new(VecDeque::new()),
on_change: Condvar::new(),
is_running: AtomicBool::new(true),
});
for _ in 0..num_threads {
let pool = state.clone();
thread::spawn(move || {
while pool.is_running.load(Ordering::Relaxed) {
let mut queue = pool.queue.lock().unwrap();
let job = queue.pop_front();
match job {
Some(job) => job(),
None => {}
}
}
});
}
Self(state)
}
pub fn enqueue<F, T>(&self, f: F) -> Future<T>
where
F: FnOnce() -> T,
F: Send + Sync + 'static,
T: Send + Sync + 'static
{
let mut queue = self.0.queue.lock().unwrap();
let result: Future<T> = Future::new();
let their_future = result.clone();
queue.push_back(box move || {
let (inner, on_change) = &*their_future.0;
{
let mut guard = inner.lock().unwrap();
*guard = Some(f());
}
on_change.notify_all();
});
self.0.on_change.notify_one();
result
}
pub fn shutdown(&self) {
self.0.is_running.store(false, Ordering::Relaxed);
}
}
pub struct Future<T>(Arc<(Mutex<Option<T>>, Condvar)>);
impl<T> Clone for Future<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> Future<T> {
pub fn new() -> Self {
Self(Arc::new((Mutex::new(None), Condvar::new())))
}
pub fn get(&self) -> T {
let (inner, on_change) = &*self.0;
let mut guard = inner.lock().unwrap();
if guard.is_some() {
return guard.take().unwrap();
}
let mut guard = on_change.wait(guard).unwrap();
guard.take().unwrap()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment