Skip to content

Commit

Permalink
Completed overhaul of disk-based intermediate file handling and compa…
Browse files Browse the repository at this point in the history
…tibility with in-memory intermediates
  • Loading branch information
Craig Henderson committed Sep 29, 2013
1 parent cba7a00 commit 9f6d16c
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 50 deletions.
66 changes: 54 additions & 12 deletions examples/wordcount/wordcount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
#include <iostream>

template<>
inline
uintmax_t const
mapreduce::detail::length(std::pair<char const *, uintmax_t> const &string)
inline uintmax_t const mapreduce::length(std::pair<char const *, uintmax_t> const &string)
{
return string.second;
}

template<>
inline char const * const mapreduce::data(std::pair<char const *, uintmax_t> const &string)
{
return string.first;
}


template<>
bool std::less<std::pair<char const *, std::uintmax_t> >::operator()(
Expand Down Expand Up @@ -87,6 +91,9 @@ double const sum(T const &durations)

void write_stats(mapreduce::results const &result)
{
if (result.map_times.size() == 0 || result.reduce_times.size() == 0)
return;

std::cout << std::endl << "\nMapReduce statistics:";
std::cout << "\n MapReduce job runtime : " << result.job_runtime.count() << "s of which...";
std::cout << "\n Map phase runtime : " << result.map_runtime.count() << "s";
Expand Down Expand Up @@ -182,7 +189,7 @@ void run_wordcount(mapreduce::specification const &spec)
}
catch (std::exception &e)
{
std::cout << std::endl << "Error: " << e.what();
std::cout << "\nError: " << e.what();
}
}

Expand Down Expand Up @@ -260,26 +267,61 @@ int main(int argc, char **argv)
spec.reduce_tasks = std::max(1U, std::thread::hardware_concurrency());

std::cout << "\n" << std::max(1U, std::thread::hardware_concurrency()) << " CPU cores";
#if 0

/*
the tests before are in pairs; tests running without a
functional combiner, and then with a combiner object
*/

// test using a reduce key of a char pointer and length, to
// a memory-mapped buffer of text. this will work only for
// in-memory intermediates where the memory-mapped buffer
// lifetime exceeds the duration of the map reduce job
run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task> >(spec);
wordcount::reduce_task<std::pair<char const *, std::uintmax_t>>> >(spec);

run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task,
wordcount::combiner> >(spec);
#endif
wordcount::reduce_task<std::pair<char const *, std::uintmax_t>>,
wordcount::combiner<wordcount::reduce_task<std::pair<char const *, std::uintmax_t>>>>>(spec);

// these are functionally the same as the jobs above, but use std::string
// as the reduce key so the char buffer is owned by the intermediate store.
// this is less efficient, but more robust if the memory-mapped buffer
// may go out of scope
run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task<std::string>> >(spec);

run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task<std::string>,
wordcount::combiner<wordcount::reduce_task<std::string>>>>(spec);

// because the intermediates are stored on disk and read back during the reduce
// phase, the reduce keys must own their own storage, so std::string is used
run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task<std::string>,
mapreduce::null_combiner,
mapreduce::datasource::directory_iterator<wordcount::map_task>,
mapreduce::intermediates::local_disk<
wordcount::map_task, wordcount::reduce_task<std::string>>>>(spec);

run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task,
mapreduce::null_combiner,// mapreduce::null_combiner, needs to work for here too
wordcount::reduce_task<std::string>,
wordcount::combiner<wordcount::reduce_task<std::string>>,
mapreduce::datasource::directory_iterator<wordcount::map_task>,
mapreduce::intermediates::local_disk<wordcount::map_task, wordcount::reduce_task>>>(spec);
mapreduce::intermediates::local_disk<
wordcount::map_task, wordcount::reduce_task<std::string>>>>(spec);

return 0;
}
Expand Down
15 changes: 10 additions & 5 deletions examples/wordcount/wordcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ struct map_task : public mapreduce::map_task<
}
};

struct reduce_task : public mapreduce::reduce_task<std::string, unsigned>
template<typename KeyType>
struct reduce_task : public mapreduce::reduce_task<KeyType, unsigned>
{
template<typename Runtime, typename It>
void operator()(Runtime &runtime, key_type const &key, It it, It const ite) const
Expand All @@ -51,26 +52,30 @@ struct reduce_task : public mapreduce::reduce_task<std::string, unsigned>
}
};

template<typename ReduceTask>
class combiner
{
public:
void start(reduce_task::key_type const &)
void start(typename ReduceTask::key_type const &)
{
total_ = 0;
}

template<typename IntermediateStore>
void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
void finish(typename ReduceTask::key_type const &key, IntermediateStore &intermediate_store)
{
if (total_ > 0)
{
// the combiner needs to emit an intermediate result, not a final result, so
// here we convert the type from std::string (final) to intermediate (ptr/length)
intermediate_store.insert(std::make_pair(key.c_str(), key.length()), total_);
intermediate_store.insert(
std::make_pair(
mapreduce::data(key),
mapreduce::length(key)), total_);
}
}

void operator()(reduce_task::value_type const &value)
void operator()(typename ReduceTask::value_type const &value)
{
total_ += value;
}
Expand Down
48 changes: 40 additions & 8 deletions include/detail/intermediates/in_memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ namespace mapreduce {

namespace intermediates {

namespace detail {

using namespace ::mapreduce::detail;

template<typename ReduceKeyType, typename MapValueType>
inline ReduceKeyType convert(MapValueType const &value)
{
return value;
}

template<>
inline std::string convert(std::pair<char const *, std::uintmax_t> const &value)
{
return std::string(value.first, value.second);
}

template<>
inline std::pair<char const *, std::uintmax_t> convert(std::string const &value)
{
return std::make_pair(value.c_str(), value.length());
}

} // namespace detail

template<typename MapTask, typename ReduceTask>
class reduce_null_output
{
Expand All @@ -22,9 +46,10 @@ class reduce_null_output
{
}

void operator()(typename ReduceTask::key_type const &/*key*/,
typename ReduceTask::value_type const &/*value*/)
bool const operator()(typename ReduceTask::key_type const &/*key*/,
typename ReduceTask::value_type const &/*value*/)
{
return true;
}
};

Expand Down Expand Up @@ -215,7 +240,7 @@ class in_memory : detail::noncopyable

for (typename map_type::iterator it=other_map.begin(); it!=other_map.end(); ++it)
{
typename map_type::iterator iti = map.insert(make_pair(it->first, typename map_type::mapped_type())).first;
typename map_type::iterator iti = map.insert(std::make_pair(it->first, typename map_type::mapped_type())).first;
std::copy(it->second.begin(), it->second.end(), std::back_inserter(iti->second));
}
}
Expand All @@ -227,24 +252,27 @@ class in_memory : detail::noncopyable
other.intermediates_.clear();
}

// receive final result
template<typename StoreResult>
bool const insert(typename reduce_task_type::key_type const &key,
typename reduce_task_type::value_type const &value,
StoreResult &store_result)
{
store_result(key, value);
return insert(key, value);
return store_result(key, value)
&& insert(detail::convert<map_task_type::value_type>(key), value);
}

bool const insert(typename reduce_task_type::key_type const &key,
// receive intermediate result
bool const insert(typename map_task_type::value_type const &key,
typename reduce_task_type::value_type const &value)
{
unsigned const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_);
typename intermediates_t::value_type &map = intermediates_[partition];

auto reduce_key = detail::convert<typename reduce_task_type::key_type>(key);
map.insert(
make_pair(
key,
std::make_pair(
reduce_key,
typename intermediates_t::value_type::mapped_type())).first->second.push_back(value);

return true;
Expand Down Expand Up @@ -274,6 +302,10 @@ class in_memory : detail::noncopyable
}
}

void combine(null_combiner &)
{
}

private:
unsigned const num_partitions_;
intermediates_t intermediates_;
Expand Down
17 changes: 5 additions & 12 deletions include/detail/intermediates/local_disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ struct null_combiner;

namespace detail {

template<typename T>
inline
uintmax_t const length(T const &value)
{
return value.length();
}

struct file_lines_comp
{
template<typename T>
Expand Down Expand Up @@ -82,7 +75,7 @@ struct file_merger
std::string line;
std::getline(*it->first, line, '\r');

if (detail::length(line) > 0)
if (length(line) > 0)
{
std::istringstream l(line);
l >> it->second;
Expand Down Expand Up @@ -567,21 +560,21 @@ class local_disk : detail::noncopyable
std::ifstream infile(filename.c_str());
while (!(infile >> kv).eof())
{
if (kv.first != last_key && detail::length(kv.first) > 0)
if (kv.first != last_key && length(kv.first) > 0)
{
if (detail::length(last_key) > 0)
if (length(last_key) > 0)
{
callback(last_key, values.begin(), values.end());
values.clear();
}
if (detail::length(kv.first) > 0)
if (length(kv.first) > 0)
std::swap(kv.first, last_key);
}

values.push_back(kv.second);
}

if (detail::length(last_key) > 0)
if (length(last_key) > 0)
callback(last_key, values.begin(), values.end());

infile.close();
Expand Down
24 changes: 14 additions & 10 deletions include/detail/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,20 @@

namespace mapreduce {

template<typename T> size_t length(T const &str);
template<typename T> uintmax_t const length(T const &str);
template<typename T> char const * const data(T const &str);

template<>
inline uintmax_t const length(std::string const &str)
{
return str.length();
}

template<>
inline char const * const data(std::string const &str)
{
return str.data();
}

template<typename MapKey, typename MapValue>
class map_task
Expand Down Expand Up @@ -66,9 +79,6 @@ class job : detail::noncopyable

// consolidating map intermediate results can save time by
// aggregating the mapped valued at mapper
#ifdef DEBUG_TRACE_OUTPUT
std::clog << "\nRunning combiner...";
#endif
combiner_type instance;
intermediate_store_.combine(instance);

Expand Down Expand Up @@ -265,12 +275,6 @@ class job : detail::noncopyable
intermediate_store_type intermediate_store_;
};

template<>
inline size_t length(std::string const &str)
{
return str.length();
}

} // namespace mapreduce

// Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down
2 changes: 1 addition & 1 deletion include/detail/schedule_policy/cpu_parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class cpu_parallel : mapreduce::detail::noncopyable
}
map_threads.join_all();
result.map_runtime = std::chrono::system_clock::now() - start_time;
result.counters.actual_map_tasks = map_tasks;
result.counters.actual_map_tasks = map_tasks;
}

void intermediate(Job &job, results &result)
Expand Down
14 changes: 12 additions & 2 deletions include/mapreduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ class joined_thread_group : public std::vector<std::thread>
void join_all(void)
{
for (auto &thread : *this)
thread.join();
{
try
{
thread.join();
}
catch (std::exception &)
{
// std::thread seems to throw "invalid argument" if
// the thread has already gone away when we call join()
}
}
}
};

Expand Down Expand Up @@ -128,10 +138,10 @@ struct results
#include <boost/throw_exception.hpp>
#include "detail/platform.hpp"
#include "detail/mergesort.hpp"
#include "detail/null_combiner.hpp"
#include "detail/intermediates.hpp"
#include "detail/schedule_policy.hpp"
#include "detail/datasource.hpp"
#include "detail/null_combiner.hpp"
#include "detail/job.hpp"

namespace mapreduce {
Expand Down

0 comments on commit 9f6d16c

Please sign in to comment.