Skip to content

Commit

Permalink
Remove SchedulingParams variants of ThreadPool::TryParallelFor (#5050)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlh20 authored Sep 3, 2020
1 parent fde7a2c commit bbb9d92
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 117 deletions.
89 changes: 0 additions & 89 deletions include/onnxruntime/core/platform/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,67 +52,6 @@ class LoopCounter;

class ThreadPool {
public:
// Scheduling strategies for ParallelFor. The strategy governs how the given
// units of work are distributed among the available threads in the
// threadpool.
enum class SchedulingStrategy {
// The Adaptive scheduling strategy adaptively chooses the shard sizes based
// on the cost of each unit of work, and the cost model of the underlying
// threadpool device.
//
// The 'cost_per_unit' is an estimate of the number of CPU cycles (or
// nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
// creates too many shards and CPU time will be dominated by per-shard
// overhead, such as Context creation. Underestimating may not fully make
// use of the specified parallelism, and may also cause inefficiencies due
// to load balancing issues and stragglers.
kAdaptive,
// The Fixed Block Size scheduling strategy shards the given units of work
// into shards of fixed size. In case the total number of units is not
// evenly divisible by 'block_size', at most one of the shards may be of
// smaller size. The exact number of shards may be found by a call to
// NumShardsUsedByFixedBlockSizeScheduling.
//
// Each shard may be executed on a different thread in parallel, depending
// on the number of threads available in the pool. Note that when there
// aren't enough threads in the pool to achieve full parallelism, function
// calls will be automatically queued.
kFixedBlockSize
};

// Contains additional parameters for either the Adaptive or the Fixed Block
// Size scheduling strategy.
class SchedulingParams {
public:
explicit SchedulingParams(SchedulingStrategy strategy, optional<int64_t> cost_per_unit,
optional<std::ptrdiff_t> block_size)
: strategy_(strategy), cost_per_unit_(cost_per_unit), block_size_(block_size) {
}

SchedulingStrategy strategy() const {
return strategy_;
}
optional<int64_t> cost_per_unit() const {
return cost_per_unit_;
}
optional<std::ptrdiff_t> block_size() const {
return block_size_;
}

private:
// The underlying Scheduling Strategy for which this instance contains
// additional parameters.
SchedulingStrategy strategy_;

// The estimated cost per unit of work in number of CPU cycles (or
// nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
// strategy.
optional<int64_t> cost_per_unit_;

// The block size of each shard. Only applicable for Fixed Block Size
// scheduling strategy.
optional<std::ptrdiff_t> block_size_;
};
#ifdef _WIN32
using NAME_CHAR_TYPE = wchar_t;
#else
Expand Down Expand Up @@ -192,34 +131,6 @@ class ThreadPool {
#endif
}

// Similar to ParallelFor above, but takes the specified scheduling strategy
// into account.
void ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params,
const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn);

static void TryParallelFor(concurrency::ThreadPool* tp, std::ptrdiff_t total,
const SchedulingParams& scheduling_params,
const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn) {
#ifdef _OPENMP
ORT_UNUSED_PARAMETER(scheduling_params);
std::ptrdiff_t num_threads = concurrency::ThreadPool::DegreeOfParallelism(tp);
if (total < num_threads) {
num_threads = total;
}
#pragma omp parallel for
for (std::ptrdiff_t i = 0; i < num_threads; i++) {
auto work = PartitionWork(i, num_threads, total);
fn(work.start, work.end);
}
#else
if (tp == nullptr) {
fn(0, total);
return;
}
tp->ParallelFor(total, scheduling_params, fn);
#endif
}

// Return the degree of parallelism that code should assume when using the thread pool.
// This API takes into account if OpenMP is enabled/disabled, and if the thread pool ptr is
// nullptr. It decouples the degree of parallelism for use with the thread pool from
Expand Down
18 changes: 0 additions & 18 deletions onnxruntime/core/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,24 +242,6 @@ int ThreadPool::NumShardsUsedByFixedBlockSizeScheduling(const std::ptrdiff_t tot
}
}

void ThreadPool::ParallelFor(std::ptrdiff_t total, const SchedulingParams& scheduling_params,
const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn) {
switch (scheduling_params.strategy()) {
case SchedulingStrategy::kAdaptive: {
if (scheduling_params.cost_per_unit().has_value()) {
ParallelFor(total, static_cast<double>(scheduling_params.cost_per_unit().value()), fn);
}
break;
}
case SchedulingStrategy::kFixedBlockSize: {
if (scheduling_params.block_size().has_value()) {
ParallelForFixedBlockSizeScheduling(total, scheduling_params.block_size().value(), fn);
}
break;
}
}
}

using CostModel = Eigen::TensorCostModel<Eigen::ThreadPoolDevice>;

// Calculates block size based on (1) the iteration cost and (2) parallel
Expand Down
2 changes: 2 additions & 0 deletions onnxruntime/test/onnx/microbenchmark/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <new>
#include <random>

#include "core/common/common.h"

// aligned memory allocate and free functions
inline void* aligned_alloc(size_t size, size_t align) {
void* ptr;
Expand Down
29 changes: 19 additions & 10 deletions onnxruntime/test/onnx/microbenchmark/gelu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,26 @@ static void BM_GeluBatchParallelFor3(benchmark::State& state) {
tpo.auto_set_affinity = true;
std::unique_ptr<concurrency::ThreadPool> tp(
concurrency::CreateThreadPool(&onnxruntime::Env::Default(), tpo, concurrency::ThreadPoolType::INTRA_OP));
concurrency::ThreadPool::SchedulingParams p(concurrency::ThreadPool::SchedulingStrategy::kFixedBlockSize, optional<int64_t>(), 4096);

// Divide work into chunks of 4096 iterations
const int64_t length_per_task = 4096;
const int64_t task_count = (batch_size + length_per_task - 1) / length_per_task;

for (auto _ : state) {
tp->ParallelFor(batch_size, p, [data, output](ptrdiff_t first, ptrdiff_t last) {
ptrdiff_t len = last - first;
float* output_ptr = output + first;
onnxruntime::ConstEigenVectorArrayMap<float> xm(data + first, len);
onnxruntime::EigenVectorArrayMap<float> ym(output_ptr, len);
ym = xm * static_cast<float>(M_SQRT1_2);
MlasComputeErf(output_ptr, output_ptr, len);
ym = xm * 0.5f * (ym + 1.0f);
});
concurrency::ThreadPool::TryBatchParallelFor(
tp.get(),
static_cast<ptrdiff_t>(task_count),
[batch_size, data, length_per_task, output](ptrdiff_t task_idx) {
const auto first = task_idx * length_per_task;
const ptrdiff_t len = std::min(length_per_task, static_cast<int64_t>(batch_size - first));
float* output_ptr = output + first;
onnxruntime::ConstEigenVectorArrayMap<float> xm(data + first, len);
onnxruntime::EigenVectorArrayMap<float> ym(output_ptr, len);
ym = xm * static_cast<float>(M_SQRT1_2);
MlasComputeErf(output_ptr, output_ptr, len);
ym = xm * 0.5f * (ym + 1.0f);
},
0);
}
aligned_free(data);
aligned_free(output);
Expand Down

0 comments on commit bbb9d92

Please sign in to comment.