Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into other_condition
Browse files Browse the repository at this point in the history
xudong963 authored May 25, 2023
2 parents 29b5f5e + 1197614 commit e03ea5e
Showing 12 changed files with 62 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -254,7 +254,7 @@ impl JoinHashTable {
// For right/full join, build side will appear at least once in the joined table
// Find the unmatched rows in build side
let mut unmatched_build_indexes = vec![];
for (chunk_index, chunk) in self.row_space.chunks.read().unwrap().iter().enumerate() {
for (chunk_index, chunk) in self.row_space.chunks.read().iter().enumerate() {
for row_index in 0..chunk.num_rows() {
if row_state[chunk_index][row_index] == 0 {
unmatched_build_indexes.push(RowPtr::new(chunk_index, row_index));
@@ -270,7 +270,7 @@ impl JoinHashTable {
&self,
unmatched_build_indexes: &Vec<RowPtr>,
) -> Result<DataBlock> {
let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
@@ -315,7 +315,7 @@ impl JoinHashTable {
// Record row in build side that is matched how many rows in probe side.
pub(crate) fn row_state_for_right_join(&self) -> Result<Vec<Vec<usize>>> {
let build_indexes = self.hash_join_desc.join_state.build_indexes.read();
let chunks = self.row_space.chunks.read().unwrap();
let chunks = self.row_space.chunks.read();
let mut row_state = Vec::with_capacity(chunks.len());
for chunk in chunks.iter() {
let mut rows = Vec::with_capacity(chunk.num_rows());
@@ -337,7 +337,7 @@ impl JoinHashTable {
}

pub(crate) fn rest_block(&self) -> Result<DataBlock> {
let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -41,7 +41,10 @@ pub trait HashJoinState: Send + Sync {
fn divide_finalize_task(&self) -> Result<()>;

/// Get the finalize task and using the `chunks` in `row_space` to build hash table in parallel.
fn finalize(&self) -> Result<bool>;
fn finalize(&self, task: (usize, usize)) -> Result<()>;

/// Get one finalize task
fn task(&self) -> Option<(usize, usize)>;

/// Detach to state: `finalize_count`.
fn finalize_end(&self) -> Result<()>;
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ use crate::sql::planner::plans::JoinType;
impl HashJoinState for JoinHashTable {
fn build(&self, input: DataBlock) -> Result<()> {
let data_block_size_limit = self.ctx.get_settings().get_max_block_size()? * 16;
let mut buffer = self.row_space.buffer.write().unwrap();
let mut buffer = self.row_space.buffer.write();
buffer.push(input);
let buffer_row_size = buffer.iter().fold(0, |acc, x| acc + x.num_rows());
if buffer_row_size < data_block_size_limit as usize {
@@ -88,23 +88,23 @@ impl HashJoinState for JoinHashTable {
}

fn attach(&self) -> Result<()> {
let mut count = self.build_count.lock().unwrap();
let mut count = self.build_count.lock();
*count += 1;
let mut count = self.finalize_count.lock().unwrap();
let mut count = self.finalize_count.lock();
*count += 1;
self.worker_num.fetch_add(1, Ordering::Relaxed);
Ok(())
}

fn build_end(&self) -> Result<()> {
let mut count = self.build_count.lock().unwrap();
let mut count = self.build_count.lock();
*count -= 1;
if *count == 0 {
// Divide the finalize phase into multiple tasks.
self.divide_finalize_task()?;

// Get the number of rows of the build side.
let chunks = self.row_space.chunks.read().unwrap();
let chunks = self.row_space.chunks.read();
let mut row_num = 0;
for chunk in chunks.iter() {
row_num += chunk.num_rows();
@@ -181,28 +181,25 @@ impl HashJoinState for JoinHashTable {
let hashtable = unsafe { &mut *self.hash_table.get() };
*hashtable = hashjoin_hashtable;

let mut is_built = self.is_built.lock().unwrap();
let mut is_built = self.is_built.lock();
*is_built = true;
self.built_notify.notify_waiters();
Ok(())
} else {
Ok(())
}
Ok(())
}

fn divide_finalize_task(&self) -> Result<()> {
{
let buffer = self.row_space.buffer.write().unwrap();
let buffer = self.row_space.buffer.write();
if !buffer.is_empty() {
let data_block = DataBlock::concat(&buffer)?;
self.add_build_block(data_block)?;
}
}

let chunks = self.row_space.chunks.read().unwrap();
let chunks = self.row_space.chunks.read();
let chunks_len = chunks.len();
if chunks_len == 0 {
self.unfinished_task_num.store(0, Ordering::Relaxed);
return Ok(());
}

@@ -216,28 +213,15 @@ impl HashJoinState for JoinHashTable {
let mut finalize_tasks = self.finalize_tasks.write();
for idx in 0..task_num - 1 {
let task = (idx * task_size, (idx + 1) * task_size);
finalize_tasks.push(task);
finalize_tasks.push_back(task);
}
let last_task = ((task_num - 1) * task_size, chunks_len);
finalize_tasks.push(last_task);

self.unfinished_task_num
.store(task_num as i32, Ordering::Relaxed);
finalize_tasks.push_back(last_task);

Ok(())
}

fn finalize(&self) -> Result<bool> {
// Get task
let task_idx = self.unfinished_task_num.fetch_sub(1, Ordering::SeqCst) - 1;
if task_idx < 0 {
return Ok(false);
}
let task = {
let finalize_tasks = self.finalize_tasks.read();
finalize_tasks[task_idx as usize]
};

fn finalize(&self, task: (usize, usize)) -> Result<()> {
let entry_size = self.entry_size.load(Ordering::Relaxed);
let mut local_raw_entry_spaces: Vec<Vec<u8>> = Vec::new();
let hashtable = unsafe { &mut *self.hash_table.get() };
@@ -357,7 +341,7 @@ impl HashJoinState for JoinHashTable {
}

let interrupt = self.interrupt.clone();
let chunks = self.row_space.chunks.read().unwrap();
let chunks = self.row_space.chunks.read();
let mut has_null = false;
for chunk_index in task.0..task.1 {
if interrupt.load(Ordering::Relaxed) {
@@ -425,63 +409,46 @@ impl HashJoinState for JoinHashTable {
}

{
let mut raw_entry_spaces = self.raw_entry_spaces.lock().unwrap();
let mut raw_entry_spaces = self.raw_entry_spaces.lock();
raw_entry_spaces.extend(local_raw_entry_spaces);
}
Ok(true)
Ok(())
}

fn task(&self) -> Option<(usize, usize)> {
let mut tasks = self.finalize_tasks.write();
tasks.pop_front()
}

fn finalize_end(&self) -> Result<()> {
let mut count = self.finalize_count.lock().unwrap();
let mut count = self.finalize_count.lock();
*count -= 1;
if *count == 0 {
let mut is_finalized = self.is_finalized.lock().unwrap();
let mut is_finalized = self.is_finalized.lock();
*is_finalized = true;
self.finalized_notify.notify_waiters();
Ok(())
} else {
Ok(())
}
Ok(())
}

#[async_backtrace::framed]
async fn wait_build_finish(&self) -> Result<()> {
let notified = {
let built_guard = self.is_built.lock().unwrap();

match *built_guard {
true => None,
false => Some(self.built_notify.notified()),
}
};

if let Some(notified) = notified {
notified.await;
if !*self.is_built.lock() {
self.built_notify.notified().await;
}

Ok(())
}

#[async_backtrace::framed]
async fn wait_finalize_finish(&self) -> Result<()> {
let notified = {
let finalized_guard = self.is_finalized.lock().unwrap();

match *finalized_guard {
true => None,
false => Some(self.finalized_notify.notified()),
}
};

if let Some(notified) = notified {
notified.await;
if !*self.is_finalized.lock() {
self.finalized_notify.notified().await;
}

Ok(())
}

fn mark_join_blocks(&self) -> Result<Vec<DataBlock>> {
let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
@@ -571,7 +538,7 @@ impl HashJoinState for JoinHashTable {
}

fn right_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result<Vec<DataBlock>> {
let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -13,12 +13,11 @@
// limitations under the License.

use std::cell::SyncUnsafeCell;
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::Mutex;

use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::bitmap::MutableBitmap;
@@ -43,6 +42,7 @@ use common_hashtable::RowPtr;
use common_hashtable::StringHashJoinHashMap;
use common_sql::plans::JoinType;
use ethnum::U256;
use parking_lot::Mutex;
use parking_lot::RwLock;

use super::ProbeState;
@@ -102,8 +102,7 @@ pub struct JoinHashTable {
pub(crate) interrupt: Arc<AtomicBool>,
/// Finalize tasks
pub(crate) worker_num: Arc<AtomicU32>,
pub(crate) finalize_tasks: Arc<RwLock<Vec<(usize, usize)>>>,
pub(crate) unfinished_task_num: Arc<AtomicI32>,
pub(crate) finalize_tasks: Arc<RwLock<VecDeque<(usize, usize)>>>,
}

impl JoinHashTable {
@@ -165,8 +164,7 @@ impl JoinHashTable {
probe_schema: probe_data_schema,
interrupt: Arc::new(AtomicBool::new(false)),
worker_num: Arc::new(AtomicU32::new(0)),
finalize_tasks: Arc::new(RwLock::new(vec![])),
unfinished_task_num: Arc::new(AtomicI32::new(0)),
finalize_tasks: Arc::new(RwLock::new(VecDeque::new())),
})
}

Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ impl JoinHashTable {
let build_indexes = &mut probe_state.build_indexes;
let build_indexes_ptr = build_indexes.as_mut_ptr();

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ impl JoinHashTable {
let local_build_indexes_ptr = local_build_indexes.as_mut_ptr();
let mut validity = MutableBitmap::with_capacity(JOIN_MAX_BLOCK_SIZE);

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ impl JoinHashTable {
let build_indexes = &mut probe_state.build_indexes;
let build_indexes_ptr = build_indexes.as_mut_ptr();

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -133,7 +133,7 @@ impl JoinHashTable {
let build_indexes = &mut probe_state.build_indexes;
let build_indexes_ptr = build_indexes.as_mut_ptr();

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ impl JoinHashTable {
let local_build_indexes_ptr = local_build_indexes.as_mut_ptr();
let mut validity = MutableBitmap::with_capacity(JOIN_MAX_BLOCK_SIZE);

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ impl JoinHashTable {
let build_indexes = &mut probe_state.build_indexes;
let build_indexes_ptr = build_indexes.as_mut_ptr();

let data_blocks = self.row_space.chunks.read().unwrap();
let data_blocks = self.row_space.chunks.read();
let data_blocks = data_blocks
.iter()
.map(|c| &c.data_block)
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::RwLock;

use common_exception::Result;
use common_expression::types::DataType;
@@ -23,6 +22,7 @@ use common_expression::DataSchemaRef;
use common_expression::KeysState;
use common_hashtable::RowPtr;
use common_storages_fuse::TableContext;
use parking_lot::RwLock;

use crate::sessions::QueryContext;

@@ -65,15 +65,15 @@ impl RowSpace {

{
// Acquire write lock in current scope
let mut chunks = self.chunks.write().unwrap();
let mut chunks = self.chunks.write();
chunks.push(chunk);
}

Ok(())
}

pub fn datablocks(&self) -> Vec<DataBlock> {
let chunks = self.chunks.read().unwrap();
let chunks = self.chunks.read();
chunks.iter().map(|c| c.data_block.clone()).collect()
}

Loading
Oops, something went wrong.

0 comments on commit e03ea5e

Please sign in to comment.