Skip to content

Commit

Permalink
Overhaul of disk-based intermediate file handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Henderson committed Sep 27, 2013
1 parent a77f646 commit cba7a00
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 237 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ _ReSharper*/
# Office Temp Files
~$*
tests/mrtest/data
examples/wordcount/mapreduce_?_of_?
70 changes: 62 additions & 8 deletions examples/wordcount/wordcount.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2009-2013 Craig Henderson
// https://github.com/cdmh/mapreduce

#define DEBUG_TRACE_OUTPUT
#define BOOST_DISABLE_ASSERTS
#if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
# pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
Expand All @@ -23,7 +24,9 @@
#include <iostream>

template<>
inline uintmax_t const mapreduce::detail::length(std::pair<char const *, uintmax_t> const &string)
inline
uintmax_t const
mapreduce::detail::length(std::pair<char const *, uintmax_t> const &string)
{
return string.second;
}
Expand Down Expand Up @@ -185,10 +188,59 @@ void run_wordcount(mapreduce::specification const &spec)

} // anonymous namespace

// specialized stream operator to read and write a key/value pair of the types of the reduce task
inline
std::basic_ostream<char, std::char_traits<char>> &
operator<<(
std::basic_ostream<char, std::char_traits<char>> &out,
std::pair<std::pair<char const *, std::uintmax_t>, unsigned> const &keyvalue)
{
out << keyvalue.first.second << "\t";
out.write(keyvalue.first.first, keyvalue.first.second);
out << "\t" << keyvalue.second;
return out;
}

inline
std::basic_ostream<char, std::char_traits<char>> &
operator<<(
std::basic_ostream<char, std::char_traits<char>> &out,
std::pair<std::string, unsigned> const &keyvalue)
{
out <<
std::make_pair(
std::make_pair(keyvalue.first.c_str(), keyvalue.first.length()),
keyvalue.second);
return out;
}

inline
std::basic_istream<char, std::char_traits<char>> &
operator>>(
std::basic_istream<char, std::char_traits<char>> &in,
std::pair<std::string, unsigned> &keyvalue)
{
size_t length;
in >> length;
if (!in.eof() && !in.fail())
{
char tab;
in.read(&tab, 1); assert(tab == '\t');

keyvalue.first.resize(length);
in.read(&*keyvalue.first.begin(), length);
in.read(&tab, 1); assert(tab == '\t');
in >> keyvalue.second;
}
return in;
}

int main(int argc, char **argv)
{
#ifdef _CRTDBG_REPORT_FLAG
_CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF);
#endif

std::cout << "MapReduce Word Frequency Application";
if (argc < 2)
{
Expand All @@ -208,6 +260,7 @@ int main(int argc, char **argv)
spec.reduce_tasks = std::max(1U, std::thread::hardware_concurrency());

std::cout << "\n" << std::max(1U, std::thread::hardware_concurrency()) << " CPU cores";
#if 0
run_wordcount<
mapreduce::job<
wordcount::map_task,
Expand All @@ -218,14 +271,15 @@ int main(int argc, char **argv)
wordcount::map_task,
wordcount::reduce_task,
wordcount::combiner> >(spec);
#endif

//run_wordcount<
// mapreduce::job<
// wordcount::map_task,
// wordcount::reduce_task,
// wordcount::combiner,
// mapreduce::datasource::directory_iterator<wordcount::map_task>,
// mapreduce::intermediates::local_disk<wordcount::map_task, wordcount::reduce_task>>>(spec);
run_wordcount<
mapreduce::job<
wordcount::map_task,
wordcount::reduce_task,
mapreduce::null_combiner,// mapreduce::null_combiner, needs to work for here too
mapreduce::datasource::directory_iterator<wordcount::map_task>,
mapreduce::intermediates::local_disk<wordcount::map_task, wordcount::reduce_task>>>(spec);

return 0;
}
Expand Down
20 changes: 6 additions & 14 deletions examples/wordcount/wordcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ struct map_task : public mapreduce::map_task<
}
};

struct reduce_task : public mapreduce::reduce_task<
std::pair<char const *, std::uintmax_t>,
unsigned>
struct reduce_task : public mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
void operator()(Runtime &runtime, key_type const &key, It it, It const ite) const
Expand All @@ -56,13 +54,6 @@ struct reduce_task : public mapreduce::reduce_task<
class combiner
{
public:
template<typename IntermediateStore>
static void run(IntermediateStore &intermediate_store)
{
combiner instance;
intermediate_store.combine(instance);
}

void start(reduce_task::key_type const &)
{
total_ = 0;
Expand All @@ -72,16 +63,17 @@ class combiner
void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
{
if (total_ > 0)
intermediate_store.insert(key, total_);
{
// the combiner needs to emit an intermediate result, not a final result, so
// here we convert the type from std::string (final) to intermediate (ptr/length)
intermediate_store.insert(std::make_pair(key.c_str(), key.length()), total_);
}
}

void operator()(reduce_task::value_type const &value)
{
total_ += value;
}

private:
combiner() { }

private:
unsigned total_;
Expand Down
Loading

0 comments on commit cba7a00

Please sign in to comment.