Skip to content

Commit

Permalink
Unblock HdfsScanNode::DiskThread to match the threads started in the …
Browse files Browse the repository at this point in the history
…io mgr.
  • Loading branch information
Nong Li authored and lskuff committed Apr 29, 2013
1 parent 6141869 commit 4aab583
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 8 additions & 1 deletion be/src/exec/hdfs-scan-node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,17 @@ void HdfsScanNode::DiskThread() {
unique_lock<mutex> lock(disk_thread_resource_lock_);
DCHECK_LE(num_blocked_scanners_, active_scanners_.size());
// Wait if the number of buffers is at the max limit (to generate back pressure
// on the io mgr) AND all scanners can make progress.
// on the io mgr) AND all scanners can make progress AND there aren't any ranges
// started in the io mgr that don't have a scanner thread processing them.
// Thread tokens are acquired in the io mgr (i.e. num_optional_threads()) and then
// get queued in the io mgr queue. Only when this thread reads them, is a scanner
// thread (active_scanners_) created. We don't want to actively be starting new
// scanner threads if possible.
DCHECK_LE(num_blocked_scanners_, active_scanners_.size());
int num_started_ranges = runtime_state_->resource_pool()->num_optional_threads();
while (num_blocked_scanners_ == 0 &&
num_queued_io_buffers_ >= max_queued_io_buffers_ &&
num_started_ranges == active_scanners_.size() &&
!done_) {
disk_thread_resource_cv_.wait(lock);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class Expr {
// if thread_safe != NULL, it will on return, contain whether the resulting codegen
// function is thread safe.
static Status Prepare(Expr* root, RuntimeState* state, const RowDescriptor& row_desc,
bool disable_codegen = true, bool* thread_safe = false);
bool disable_codegen = true, bool* thread_safe = NULL);

// Prepare all exprs.
static Status Prepare(const std::vector<Expr*>& exprs, RuntimeState* state,
Expand Down

0 comments on commit 4aab583

Please sign in to comment.