CSC/ECE 506 Spring 2012/3b sk: Difference between revisions

From Expertiza_Wiki
Jump to navigation Jump to search
 
(84 intermediate revisions by the same user not shown)
Line 1: Line 1:
= MapReduce =
= Introduction =


MapReduce is a software framework introduced by Google in 2004 to support distributed computing on large data sets on clusters of computers.  
MapReduce is a software framework introduced by Google in 2004 to support [http://publib.boulder.ibm.com/infocenter/txformp/v6r0m0/index.jsp?topic=%2Fcom.ibm.cics.te.doc%2Ferziaz0015.htm distributed computing] on large data sets on clusters of computers.
MapReduce programming model consists of two major steps: map and reduce. In the map step, the problem being solved is divided into a series of sub-problems and distributed to different workers. After collecting results from workers, the computation enters the reduce step to combine and produce the final result.


== Overview ==
= Programming Model =
MapReduce programming model consists of two major steps: map and reduce. In the map step, the problem being solved are divided into a series of sub-problems and distributed to different workers. After collecting results from workers, the computation enters the reduce step to combine and produce the final result.
[[File:Mapreduce.png|thumbnail|MapReduce for a Shape Counter]]<br>
The MapReduce programming model is inspired by [http://enfranchisedmind.com/blog/posts/what-is-a-functional-programming-language/ functional languages] and targets data-intensive computations. The input data format is application-specific, and is specified by the user. The output is a set of <key,value> pairs. The user expresses an algorithm using two functions, Map and Reduce. The Map function is applied on the input data and produces a list of intermediate <key,value> pairs. The Reduce function is applied to all intermediate pairs with the same key. It typically performs some kind of merging operation and produces zero or more output pairs. Finally, the output pairs are sorted by their key value. In the simplest form of MapReduce programs, the programmer provides just the Map function. All other functionality, including the grouping of the intermediate pairs which have the same key and the final sorting, is provided by the runtime.
The main benefit of this model is simplicity. The programmer provides a simple description of the algorithm that focuses on functionality and not on parallelization. The actual parallelization and the details of concurrency management are left to the runtime system. Hence the program code is generic and easily portable across systems. Nevertheless, the model provides sufficient high-level information for parallelization. The Map function can be executed in parallel on non-overlapping portions of the input data and the Reduce function can be executed in parallel on each set of intermediate pairs with the same key. Similarly, since it is explicitly known which pairs each function will operate upon, one can employ pre-fetching or other scheduling optimizations for locality.


== Programming Model ==
= Examples =
The MapReduce programming model is inspired by functional languages and targets data-intensive computations. The input data format is application-specific, and is specified by the user. The output is a set of <key,value> pairs. The user expresses an algorithm using two functions, Map and Reduce. The Map function is applied on the input data and produces a list of intermediate <key,value> pairs. The Reduce function is applied to all intermediate pairs with the same key. It typically performs some kind of merging operation and produces zero or more output pairs. Finally, the output pairs are sorted by their key value. In the simplest form of MapReduce programs, the programmer provides just the Map function. All other functionality, including the grouping of the intermediate pairs which have the same key and the final sorting, is provided by the runtime.
Below are a few simple examples of programs that can be easily expressed as MapReduce computations.
The main benefit of this model is simplicity. The programmer provides a simple description of the algorithm that focuses on functionality and not on parallelization. The actual parallelization and the details of concurrency management are left to the runtime system. Hence the program code is generic and easily portable across systems. Nevertheless, the model provides sufficient high-level information for parallelization. The Map function can be executed in parallel on non-overlapping portions of the input data and the Reduce function can be executed in parallel on each set of intermediate pairs with the same key. Similarly, since it is explicitly known which pairs each function will operate upon, one can employ pre-fetching or other scheduling optimizations for locality.
*Distributed [http://unixhelp.ed.ac.uk/CGI/man-cgi?grep Grep]: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. <br>
*Count of URL Access Frequency: The map function processes logs of web page requests and outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a <URL, total count> pair. <br>
*[http://books.google.com/books?id=gJrmszNHQV4C&pg=PA376&lpg=PA376&dq=what+is+reverse+web+link+graph&source=bl&ots=rLQ2yuV6oc&sig=wimcG_7MR7d9g-ePGXkEK1ANmws&hl=en&sa=X&ei=BtxBT5HkN42DtgefhbXRBQ&ved=0CEwQ6AEwBg#v=onepage&q=what%20is%20reverse%20web%20link%20graph&f=false Reverse Web-Link Graph]: The map function outputs <target, source> pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: <target, list(source)>.<br>
*Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of <word, frequency> pairs. The map function emits a <hostname, term vector> pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final <hostname, term vector> pair.<br>
*[http://nlp.stanford.edu/IR-book/html/htmledition/a-first-take-at-building-an-inverted-index-1.html Inverted Index]: The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions.


== Sample Code ==
= Sample Code =
The following pseudocode shows the basic structure of a
MapReduce program that counts the number of occurences
of each word in a collection of documents.
<pre>
<pre>
//Input : a Document
//Input : a Document
//Intermediate Output: key =word,value=1
//Intermediate Output: key = word, value = 1
Map(void * input){
Map(void * input){
   for each word w in Input
   for each word w in Input
Line 19: Line 29:
}
}


//Intermediate Output key =word, value=1
//Intermediate Output key = word, value = 1
//Output :key =word, value= occurrences
//Output : key = word, value = occurrences
Reduce(String key, Iterator values){
Reduce(String key, Iterator values){
   int result=0;
   int result = 0;
   for each v in values  
   for each v in values  
       result+=v
       result += v
   Emit(w, result)
   Emit(w, result)
}
}
</pre>
</pre>


== Runtime System ==
= Runtime System =
The MapReduce runtime is responsible for parallelization and concurrency control. To parallelize the Map function, it splits the input pairs into units that are processed concurrently on multiple nodes. Next, the runtime partitions the intermediate pairs using a scheme that keeps pairs with the same key in the same unit. The partitions are processed in parallel by Reduce tasks running on multiple nodes. In both steps, the runtime must decide on factors such as the size of the units, the number of nodes involved, how units are assigned to nodes dynamically, and how buffer space is allocated. The decisions can be fully automatic or guided by the programmer given application specific knowledge. These decisions allow the runtime to execute a program efficiently across a wide range of machines and dataset scenarios without modifications to the source code. Finally, the runtime must merge and sort the output pairs from all Reduce tasks.
The MapReduce runtime is responsible for parallelization and concurrency control. To parallelize the Map function, it splits the input pairs into units that are processed concurrently on multiple nodes. Next, the runtime partitions the intermediate pairs using a scheme that keeps pairs with the same key in the same unit. The partitions are processed in parallel by Reduce tasks running on multiple nodes. In both steps, the runtime must decide on factors such as the size of the units, the number of nodes involved, how units are assigned to nodes dynamically, and how buffer space is allocated. The decisions can be fully automatic or guided by the programmer given application specific knowledge. These decisions allow the runtime to execute a program efficiently across a wide range of machines and dataset scenarios without modifications to the source code. Finally, the runtime must merge and sort the output pairs from all Reduce tasks.


== Implementations ==
= Implementations =
Many different implementations of the MapReduce interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines. Phoenix implements MapReduce for shared-memory systems. Hadoop and Google's map reduce implement map reduce for large clusters of commodity PCs connected together with switched Ethernet. Mars is a MapReduce framework on graphics processors (GPUs).
Many different implementations of the MapReduce interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large [http://msdn.microsoft.com/en-us/library/ms178144.aspx NUMA] multi-processor, and yet another for an even larger collection of networked machines. Phoenix implements MapReduce for shared-memory systems. Hadoop and Google's MapReduce implement map reduce for large clusters of commodity PCs connected together with switched Ethernet. Mars is a MapReduce framework on graphics processors ([http://www.nvidia.com/object/gpu.html GPUs]).


=== Google's Map Reduce ===
== Google's MapReduce ==
Google's map reduce implements map reduce for large clusters of commodity PCs connected together with switched Ethernet.
Google's MapReduce implements MapReduce for large clusters of commodity PCs connected together with switched Ethernet.


==== Execution Overview ====
=== Execution Overview ===
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and
the partitioning function are specified by the user.
the partitioning function are specified by the user.
// Figure 1
 
Figure 1 shows the overall flow of a MapReduce operation in Google's implementation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in Figure 1 correspond to the numbers in the list below):
[[File:Google Map Reduce.jpg]] <br>
The figure above shows the overall flow of a MapReduce operation in Google's implementation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in the figure above correspond to the numbers in the list below):


1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.
1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.
Line 51: Line 62:


4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. When a reduce worker is notified by the master about these locations, it uses remote procedure calls
4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. When a reduce worker is notified by the master about these locations, it uses remote procedure calls
to read the buffered data from the local disks of the  map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to _t in memory, an external sort is used.
to read the buffered data from the local disks of the  map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.


6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
Line 58: Line 69:
After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file . they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files.
After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file . they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files.


==== Master Data Structures ====
=== Master Data Structures ===


The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks).
The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks).
The master is the conduit through which the location of intermediate _le regions is propagated from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks.
The master is the conduit through which the location of intermediate file regions is propagated from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks.


==== Fault Tolerance ====
=== Fault Tolerance ===


Since the MapReduce library is designed to help process very large amounts of data using hundreds or thousands of machines, the library must tolerate machine failures gracefully.
Since the MapReduce library is designed to help process very large amounts of data using hundreds or thousands of machines, the library must tolerate machine failures gracefully.
Line 70: Line 81:


The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling
The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling
on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global _le system. When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution.
on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system. When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution. Any reduce task that has not already read the data from worker A will read the data from worker B. MapReduce is resilient to large-scale worker failures.
 
Any reduce task that has not already read the data from worker A will read the data from worker B. MapReduce is resilient to large-scale worker failures. For example, during one MapReduce operation, network maintenance on a running cluster was causing groups of
80 machines at a time to become unreachable for several minutes. The MapReduce master simply re-executed the work done by the unreachable worker machines, and continued to make forward progress, eventually completing the MapReduce operation.


==== Master Failure ====
==== Master Failure ====


It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore our current
It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore Google's current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.
implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.


=== Phoenix ===
== Phoenix ==
Phoenix implements MapReduce for shared-memory systems. Its goal is to support efficient execution on multiple cores without burdening the programmer with concurrency management. Phoenix consists of a simple API that is visible to application programmers and an efficient runtime that handles parallelization, resource management, and fault recovery.
Phoenix implements MapReduce for shared-memory systems. Its goal is to support efficient execution on multiple cores without burdening the programmer with concurrency management. Phoenix consists of a simple API that is visible to application programmers and an efficient runtime that handles parallelization, resource management, and fault recovery.


==== Phoenix API ====
=== Phoenix API ===
The current Phoenix implementation provides an application-programmer interface (API) for C and C++.The first set is provided by Phoenix and is used by the programmer’s application code to initialize the system and emit output pairs (1 required and 2 optional functions). The second set includes the functions that the programmer defines (3 required and 2 optional functions). Apart from the Map and Reduce functions, the user provides functions that partition the data before each step and a function that implements key comparison. The API is type agnostic. The function arguments are declared as void pointers wherever possible to provide flexibility in their declaration and fast use without conversion overhead. The data structure used to communicate basic function information and buffer allocation between the user code and runtime is of type scheduler_args_t. There are additional data structure types to facilitate communication between the Splitter, Map, Partition, and Reduce functions. These types use pointers whenever possible to implement communication without actually copying significant amounts of data.
The current Phoenix implementation provides an application-programmer interface (API) for C and C++.The first set is provided by Phoenix and is used by the programmer’s application code to initialize the system and emit output pairs (1 required and 2 optional functions). The second set includes the functions that the programmer defines (3 required and 2 optional functions). Apart from the Map and Reduce functions, the user provides functions that partition the data before each step and a function that implements key comparison. The API is type agnostic. The function arguments are declared as void pointers wherever possible to provide flexibility in their declaration and fast use without conversion overhead. The data structure used to communicate basic function information and buffer allocation between the user code and runtime is of type scheduler_args_t. There are additional data structure types to facilitate communication between the Splitter, Map, Partition, and Reduce functions. These types use pointers whenever possible to implement communication without actually copying significant amounts of data.
The Phoenix API does not rely on any specific compiler options and does not require a parallelizing compiler. However, it assumes that its functions can freely use stack-allocated and heap-allocated structures for private data. It also assumes that there is no communication through shared-memory structures other than the input/output buffers for these functions. For C/C++, we cannot check these assumptions statically for arbitrary programs. Although there are stringent checks within the system to ensure valid data are communicated between user and runtime code, eventually it is the task of user to provide functionally correct code.  
The Phoenix API does not rely on any specific compiler options and does not require a parallelizing compiler. However, it assumes that its functions can freely use stack-allocated and heap-allocated structures for private data. It also assumes that there is no communication through shared-memory structures other than the input/output buffers for these functions. For C/C++, these assumptions cannot be checked statically for arbitrary programs. Although there are stringent checks within the system to ensure valid data are communicated between user and runtime code, eventually it is the task of user to provide functionally correct code.  


//Figure 2
[[File:Phoenix.jpg]]<br>


 
The Phoenix runtime was developed on top of [http://www.yolinux.com/TUTORIALS/LinuxTutorialPosixThreads.html Pthreads], but can be easily ported to other shared memory thread packages. The figure above shows the basic data flow for the runtime system. The runtime is controlled by the scheduler, which is initiated by user code. The scheduler creates and manages the threads that run all Map and Reduce tasks. It also manages the buffers used for task communication. The programmer provides the scheduler with all the required data and function pointers through the scheduler_args_t structure. After initialization, the scheduler determines the number of cores to use for this computation. For each core, it spawns a worker thread that is dynamically assigned some number of Map and Reduce tasks.
The Phoenix runtime was developed on top of Pthreads, but can be easily ported to other shared memory thread packages. Figure 1 shows the basic data flow for the runtime system. The runtime is controlled by the scheduler, which is initiated by user code. The scheduler creates and manages the threads that run all Map and Reduce tasks. It also manages the buffers used for task communication. The programmer provides the scheduler with all the required data and function pointers through the scheduler args t structure. After initialization, the scheduler determines the number of cores to use for this computation. For each core, it spawns a worker thread that is dynamically assigned some number of Map and Reduce tasks.
To start the Map stage, the scheduler uses the Splitter to divide input pairs into equally sized units to be processed by the Map tasks. The Splitter is called once per Map task and returns a pointer to the data the Map task will process. The Map tasks are allocated dynamically to workers and each one emits intermediate <key,value> pairs. The Partition function splits the intermediate pairs into units for the Reduce tasks. The function ensures all values of the same key go to the same unit. Within each buffer, values are ordered by key to assist with the final sorting. At this point, the Map stage is over. The scheduler must wait for all Map tasks to complete before initiating the Reduce stage.
To start the Map stage, the scheduler uses the Splitter to divide input pairs into equally sized units to be processed by the Map tasks. The Splitter is called once per Map task and returns a pointer to the data the Map task will process. The Map tasks are allocated dynamically to workers and each one emits intermediate <key,value> pairs. The Partition function splits the intermediate pairs into units for the Reduce tasks. The function ensures all values of the same key go to the same unit. Within each buffer, values are ordered by key to assist with the final sorting. At this point, the Map stage is over. The scheduler must wait for all Map tasks to complete before initiating the Reduce stage.
Reduce tasks are also assigned to workers dynamically, similar to Map tasks. The one difference is that, while with Map tasks there is complete freedom in distributing pairs across tasks, with Reduce we must process all values for the same key in one task. Hence, the Reduce stage may exhibit higher imbalance across workers and dynamic scheduling is more important. The output of each Reduce task is already sorted by key. As the last step, the final output from all tasks is merged into a single buffer, sorted by keys. The merging takes place in log2(P/2) steps, where P is the number of workers used.
Reduce tasks are also assigned to workers dynamically, similar to Map tasks. The one difference is that, while with Map tasks there is complete freedom in distributing pairs across tasks, with Reduce all values for the same key must be processed in one task. Hence, the Reduce stage may exhibit higher imbalance across workers and dynamic scheduling is more important. The output of each Reduce task is already sorted by key. As the last step, the final output from all tasks is merged into a single buffer, sorted by keys.
 
 
==== Buffer Management ====
Two types of temporary buffers are necessary to store data between the various stages. All buffers are allocated in shared memory but are accessed in a well specified way by a few functions. To re-arrange buffers (e.g., split across tasks), pointer manipulation is done instead of the actual pairs, which may be large in size. The intermediate buffers are not directly visible to user code. Map-Reduce buffers are used to store the intermediate output pairs. Each worker has its own set of buffers. The buffers are initially sized to a default value and then resized dynamically as needed. At this stage, there may be multiple pairs with the same key. To accelerate the Partition function, the Emit intermediate function stores all values for the same key in the same buffer. At the end of the Map task, we sort each buffer by key order. Reduce- Merge buffers are used to store the outputs of Reduce tasks before they are sorted. At this stage, each key has only one value associated with it. After sorting, the final output is available in the user allocated Output data buffer.


=== Buffer Management ===
Two types of temporary buffers are necessary to store data between the various stages. All buffers are allocated in shared memory but are accessed in a well specified way by a few functions. To re-arrange buffers (e.g., split across tasks), pointer manipulation is done instead of the actual pairs, which may be large in size. The intermediate buffers are not directly visible to user code. Map-Reduce buffers are used to store the intermediate output pairs. Each worker has its own set of buffers. The buffers are initially sized to a default value and then resized dynamically as needed. At this stage, there may be multiple pairs with the same key. To accelerate the Partition function, the Emit intermediate function stores all values for the same key in the same buffer. At the end of the Map task, each buffer is sorted by key order. Reduce- Merge buffers are used to store the outputs of Reduce tasks before they are sorted. At this stage, each key has only one value associated with it. After sorting, the final output is available in the user allocated Output data buffer.


=== Map Reduce on Graphics Processors ===
== Map Reduce on Graphics Processors ==


Compared with CPUs, the hardware architecture of GPUs differs significantly. For instance, current GPUs have over one hundred SIMD (Single Instruction Multiple Data) processors whereas current multi-core CPUs offer a much smaller number of cores. Moreover, most GPUs do not support atomic operations or locks. Due to the architectural differences, there were following three technical challenges in implementing the MapReduce framework on the GPU. First, the synchronization overhead in the runtime system of the framework must be low so that the system can scale to hundreds of processors. Second, due to the lack of dynamic thread scheduling on current GPUs, it is essential to allocate work evenly across threads on the GPU to exploit its massive thread parallelism. Third, the core tasks of MapReduce programs, including string processing, file manipulation and concurrent reads and writes, are unconventional to GPUs and must be handled efficiently.
Compared with CPUs, the hardware architecture of GPUs differs significantly. For instance, current GPUs have over one hundred [http://encyclopedia.jrank.org/articles/pages/6904/SIMD-Single-Instruction-Multiple-Data-Processing.html SIMD (Single Instruction Multiple Data)] processors whereas current multi-core CPUs offer a much smaller number of cores. Moreover, most GPUs do not support atomic operations or locks. Due to the architectural differences, there are following three technical challenges in implementing the MapReduce framework on the GPU. First, the synchronization overhead in the runtime system of the framework must be low so that the system can scale to hundreds of processors. Second, due to the lack of dynamic thread scheduling on current GPUs, it is essential to allocate work evenly across threads on the GPU to exploit its massive thread parallelism. Third, the core tasks of MapReduce programs, including string processing, file manipulation and concurrent reads and writes, are unconventional to GPUs and must be handled efficiently.


Mars, MapReduce framework on the GPU was designed and implemented with these challenges in mind. Mars provides a small set of APIs that are similar to those of CPU-based MapReduce. Runtime system utilizes a large number of GPU threads for Map or Reduce tasks, and automatically assigns each thread a small number of key/value pairs to work on. As a result, the massive thread parallelism on the GPU is well utilized. To avoid any conflict between concurrent writes,  Mars has a lock-free scheme with low runtime overhead on the massive thread parallelism of the GPU. This scheme guarantees the correctness of parallel execution with little synchronization overhead.  
Mars, MapReduce framework on the GPU was designed and implemented with these challenges in mind. Mars provides a small set of APIs that are similar to those of CPU-based MapReduce. Runtime system utilizes a large number of GPU threads for Map or Reduce tasks, and automatically assigns each thread a small number of key/value pairs to work on. As a result, the massive thread parallelism on the GPU is well utilized. To avoid any conflict between concurrent writes,  Mars has a lock-free scheme with low runtime overhead on the massive thread parallelism of the GPU. This scheme guarantees the correctness of parallel execution with little synchronization overhead.  


==== Mars API ====
=== Mars API ===
Mars provides a small set of APIs. Similar to the existingMapReduce frameworks, Mars has two kinds of APIs, the user-implemented APIs, which the users implement, and the system-provided APIs, which the users can use as library calls. Mars has the following user-implemented APIs. These APIs are implemented with C/C++. void* type has been used so that the developer can manipulate strings and other complex data types conveniently.
Mars provides a small set of APIs. Similar to the existing MapReduce frameworks, Mars has two kinds of APIs, the user-implemented APIs, which the users implement, and the system-provided APIs, which the users can use as library calls. Mars has the following user-implemented APIs. These APIs are implemented with C/C++. void* type has been used so that the developer can manipulate strings and other complex data types conveniently.


<pre>
<pre>
//MAP_COUNT counts result size of the map function.
//MAP_COUNT counts result size of the map function.
voidMAP_COUNT(void *key, void *val, int keySize,
voidMAP_COUNT(void *key, void *val, int keySize, int valSize);
int valSize);
//The map function.
//The map function.
voidMAP(void *key, void* val, int keySize, int valSize);
voidMAP(void *key, void* val, int keySize, int valSize);
//REDUCE_COUNT counts result size of the reduce
//REDUCE_COUNT counts result size of the reduce function.
function.
void REDUCE_COUNT(void* key, void* vals, int keySize, int valCount);
void REDUCE_COUNT(void* key, void* vals, int
keySize, int valCount);
//The reduce function.
//The reduce function.
void REDUCE(void* key, void* vals, int keySize,
void REDUCE(void* key, void* vals, int keySize, int valCount);
int valCount);
</pre>
</pre>
Mars has the following four system-provided APIs. The emit functions are used in user-implemented map and reduce functions to output the intermediate/final results.
Mars has the following four system-provided APIs. The emit functions are used in user-implemented map and reduce functions to output the intermediate/final results.
<pre>
<pre>
//Emit the key size and the value size inMAP_COUNT.
//Emit the key size and the value size inMAP_COUNT.
void EMIT_INTERMEDIATE_COUNT(int keySize,
void EMIT_INTERMEDIATE_COUNT(int keySize, int valSize);
int valSize);
//Emit an intermediate result in MAP.
//Emit an intermediate result in MAP.
void EMIT_INTERMEDIATE(void* key, void* val,
void EMIT_INTERMEDIATE(void* key, void* val, int keySize, int valSize);
int keySize, int valSize);
//Emit the key size and the value size in REDUCE_COUNT.
//Emit the key size and the value size in REDUCE_COUNT.
void EMIT_COUNT(int keySize, int valSize);
void EMIT_COUNT(int keySize, int valSize);
//Emit a final result in REDUCE.
//Emit a final result in REDUCE.
void EMIT(void *key, void* val, int keySize, int
void EMIT(void *key, void* val, int keySize, int valSize);
valSize);
</pre>
</pre>
Overall, the APIs in Mars are similar to those in the existing MapReduce frameworks such as Hadoop and Phoenix. The major difference is that Mars needs two APIs to implement the functionality of each CPU-based API. One is to count the size of results, and the other one is to output the results. This is because the GPU does not support atomic operations, and the Mars runtime uses a two-step design for the result output.
Overall, the APIs in Mars are similar to those in the existing MapReduce frameworks such as Hadoop and Phoenix. The major difference is that Mars needs two APIs to implement the functionality of each CPU-based API. One is to count the size of results, and the other one is to output the results. This is because the GPU does not support atomic operations, and the Mars runtime uses a two-step design for the result output.


==== Implementation Details ====
=== Implementation Details ===


Since the GPU does not support dynamic memory allocation on the device memory during the execution of the GPU code, arrays are used as the main data structure. The input data, the intermediate result and the final result are stored in three kinds of arrays, i.e., the key array, the value array and the directory index. The directory index consists of an entry of <key offset, key size, value offset, value size> for each key/value pair. Given a directory index entry, the key or the value at the corresponding offset in the key array or the value array is fetched. With the array structure, for the input data as well as for the result output the space on the device memory is allocated before executing the GPU program. However, the sizes of the output from the map and the reduce stages are unknown. Since the output scheme for the map stage is similar to that for the reduce stage, only the scheme for the map stage is presented.
Since the GPU does not support dynamic memory allocation on the device memory during the execution of the GPU code, arrays are used as the main data structure. The input data, the intermediate result and the final result are stored in three kinds of arrays, i.e., the key array, the value array and the directory index. The directory index consists of an entry of <key offset, key size, value offset, value size> for each key/value pair. Given a directory index entry, the key or the value at the corresponding offset in the key array or the value array is fetched. With the array structure, for the input data as well as for the result output the space on the device memory is allocated before executing the GPU program. However, the sizes of the output from the map and the reduce stages are unknown.The output scheme for the map stage is similar to that for the reduce stage.


First, each map task outputs three counts, i.e., the number of intermediate results, the total size of keys (in bytes) and the total size of values (in bytes) generated by the map task. Based on key sizes (or value sizes) of all map tasks, the runtime system computes a prefix sum on these sizes and produces an array of write locations. A write location is the start location in the output array for the corresponding map task to write. Based on the number of intermediate
First, each map task outputs three counts, i.e., the number of intermediate results, the total size of keys (in bytes) and the total size of values (in bytes) generated by the map task. Based on key sizes (or value sizes) of all map tasks, the runtime system computes a prefix sum on these sizes and produces an array of write locations. A write location is the start location in the output array for the corresponding map task to write. Based on the number of intermediate results, the runtime system computes a prefix sum and produces an array of start locations in the output directory index for the corresponding map task. Through these prefix sums, the sizes of the arrays for the intermediate result is also known. Thus, the runtime allocates arrays in the device memory with the exact size for storing the intermediate results.
results, the runtime system computes a prefix sum and produces an array of start locations in the output directory index for the corresponding map task. Through these prefix sums, the sizes of the arrays for the intermediate result is also known. Thus, the runtime allocates arrays in the device memory with the exact size for storing the intermediate results.


Second, each map task outputs the intermediate key/value pairs to the output array and updates the directory index. Since each map has its deterministic and non-overlapping positions to write to, the write conflicts are avoided. This two-step scheme does not require the hardware support of atomic functions. It is suitable for the massive thread parallelism on the GPU. However, it doubles the map computation in the worst case. The overhead of this scheme is application dependent, and is usually much smaller than that in the worst case.
Second, each map task outputs the intermediate key/value pairs to the output array and updates the directory index. Since each map has its deterministic and non-overlapping positions to write to, the write conflicts are avoided. This two-step scheme does not require the hardware support of atomic functions. It is suitable for the massive thread parallelism on the GPU. However, it doubles the map computation in the worst case. The overhead of this scheme is application dependent, and is usually much smaller than that in the worst case.
==== Optimization Techniques ====
===== Memory Optimizations =====
Two memory optimizations are used to reduce the number of memory requests in order to improve the memory bandwidth utilization. Coalesced accesses. The GPU feature of coalesced accesses is utilized to improve the memory performance. We design the memory accesses of each thread to the data arrays according to the coalesced access pattern when applicable. Suppose there are T threads in total and the number of key/value pairs is N in the map stage.
Thread i processes the (i + T • k )th (k=0,..,N/T) key/value pair. Due to the SIMD property of the GPU, the memory addresses from the threads within a thread group are consecutive and these accesses are coalesced into one. Figure 3 illustrates the map stage with and without the coalesced access optimization.


//Figure 3: The map stage with and without coalesced access optimization.
=== Optimization Techniques ===
==== Memory Optimizations ====
Two memory optimizations are used to reduce the number of memory requests in order to improve the memory bandwidth utilization.
===== Coalesced accesses =====
The GPU feature of coalesced accesses is utilized to improve the memory performance. The memory accesses of each thread to the data arrays are designed according to the coalesced access pattern when applicable. Suppose there are T threads in total and the number of key/value pairs is N in the map stage. Thread i processes the (i + T • k )th (k=0,..,N/T) key/value pair. Due to the SIMD property of the GPU, the memory addresses from the threads within a thread group are consecutive and these accesses are coalesced into one. The figure below illustrates the map stage with and without the coalesced access optimization.<br>
[[File:Mars.jpg]]<br>


===== Accesses using built-in vector types =====
===== Accesses using built-in vector types =====
Line 159: Line 155:
in a single memory request. Compared with reading char or int, the number of memory requests is greatly reduced and the memory performance is improved.
in a single memory request. Compared with reading char or int, the number of memory requests is greatly reduced and the memory performance is improved.


===== Thread parallelism =====
==== Thread parallelism ====
The thread configuration, i.e., the number of thread groups and the number of threads per thread group, is related to multiple factors including, (1) the hardware configuration such as the number of multiprocessors and the on-chip computation resources such as the number of registers on each multiprocessor, (2) the computation characteristics of the map and the reduce tasks, e.g., they are memory- or computation-intensive. Since the map and the reduce functions are implemented by the developer, and their costs are unknown to the runtime system, it is difficult to find the optimal setting for the thread configuration at
The thread configuration, i.e., the number of thread groups and the number of threads per thread group, is related to multiple factors including, (1) the hardware configuration such as the number of multiprocessors and the on-chip computation resources such as the number of registers on each multiprocessor, (2) the computation characteristics of the map and the reduce tasks, e.g., they are memory- or computation-intensive. Since the map and the reduce functions are implemented by the developer, and their costs are unknown to the runtime system, it is difficult to find the optimal setting for the thread configuration at
run time.  
run time.


===== Handling variable-sized types =====  
==== Handling variable-sized types ====  
The variable-sized types are supported with the directory index. If two key/value pairs need to be swapped, their corresponding entries in the directory index are swapped without modifying the key and the value arrays. This choice is to save the swapping cost since the directory entries are typically much smaller than the key/value pairs. Even though swapping changes the order of entries in the directory index, the array layout is preserved and therefore accesses to the directory index can still be coalesced after swaps. Since strings are a typical variable-sized type, and string processing is common in web data analysis tasks, a GPU-based string manipulation library is developed. The operations in the library include strcmp, strcat, memset and so on. The APIs of these operations are consistent with those in C/C++ library on the CPU. The difference is that simple algorithms for these GPU-based string operations are used, since they usually handle small strings within a map or a reduce task. In addition, char4 is used to implement strings to optimize the memory performance.
The variable-sized types are supported with the directory index. If two key/value pairs need to be swapped, their corresponding entries in the directory index are swapped without modifying the key and the value arrays. This choice is to save the swapping cost since the directory entries are typically much smaller than the key/value pairs. Even though swapping changes the order of entries in the directory index, the array layout is preserved and therefore accesses to the directory index can still be coalesced after swaps. Since strings are a typical variable-sized type, and string processing is common in web data analysis tasks, a GPU-based string manipulation library was developed for Mars. The operations in the library include strcmp, strcat, memset and so on. The APIs of these operations are consistent with those in C/C++ library on the CPU. The difference is that simple algorithms for these GPU-based string operations were used, since they usually handle small strings within a map or a reduce task. In addition, char4 is used to implement strings to optimize the memory performance.


===== Hashing =====
==== Hashing ====
Hashing is used in the sort algorithm to store the results with the same key value consecutively. In that case, it is not needed that the results with the key values are in their strict ascending/ decreasing order. The hashing technique that hashes a key into a 32-bit integer is used, and the records are sorted according to their hash values. When two records are compared, their hash values are compared first. Only when their hash values are the same, their keys are fetched and compared. Given a good hash function, the probability of comparing the keys is low.
Hashing is used in the sort algorithm to store the results with the same key value consecutively. In that case, it is not needed that the results with the key values are in their strict ascending/ decreasing order. The hashing technique that hashes a key into a 32-bit integer is used, and the records are sorted according to their hash values. When two records are compared, their hash values are compared first. Only when their hash values are the same, their keys are fetched and compared. Given a good hash function, the probability of comparing the keys is low.


===== File manipulation =====
==== File manipulation ====
Currently, the GPU cannot directly access the data in the hard disk. Thus, the file manipulation with the assistance of the CPU is performed in three phases. First, the file I/O on the CPU is performed and the file data is loaded into a buffer in the main memory. To reduce the I/O stall, multiple threads to perform the I/O task are used. Second, the preprocessing on the buffered data is performed and the input key/value pairs are obtained. Finally, the input key/value pairs are copied to the GPU device memory.
Currently, the GPU cannot directly access the data in the hard disk. Thus, the file manipulation with the assistance of the CPU is performed in three phases. First, the file I/O on the CPU is performed and the file data is loaded into a buffer in the main memory. To reduce the I/O stall, multiple threads are used to perform the I/O task. Second, the preprocessing on the buffered data is performed and the input key/value pairs are obtained. Finally, the input key/value pairs are copied to the GPU device memory.


// Figure 5 Mars vs Phoenix
= Summary =
Google’s MapReduce runtime implementation targets large clusters of Linux PCs connected through Ethernet switches. Tasks are forked using remote procedure calls. Buffering and communication occurs by reading and writing files on a distributed file system. The locality optimizations focus mostly on avoiding remote file accesses. While such a system is effective with distributed computing, it leads to very high overheads if used with shared-memory systems that facilitate communication through memory and are typically of much smaller scale.


Figure 5 shows the performance speedup of the optimized Mars over Phoenix with the data set size varied. Overall, Mars is around 1.5-16x faster than Phoenix when the data set is large. The speedup varies for the applications with different computation characteristics. For computation-intensive applications Mars is over 4x faster than Phoenix. For other simpler applications Mars is slightly faster than Phoenix.
Phoenix, implementation of MapReduce uses shared memory and minimizes the overheads of task spawning and data communication. With Phoenix,the programmer can provide a simple, functional expression of the algorithm and leaves parallelization and scheduling to the runtime system.Phoenix leads to scalable performance for both multi-core chips and conventional symmetric multiprocessors. Phoenix automatically handles key scheduling decisions during parallel execution. Despite runtime overheads, results have shown that performance of Phoenix to that of parallel code written in P-threads API are almost similar. Nevertheless,there are also applications that do not fit naturally in the MapReduce model for which P-threads code performs significantly better.


== Summary ==
Graphics processors have emerged as a commodity platform for parallel computing. However, the developer requires the knowledge of the GPU architecture and much effort in developing GPU applications. Such difficulty is even more for complex and performance centric tasks such as web data analysis. Since MapReduce has been successful in easing the development of web data analysis tasks, one can use a GPU-based MapReduce for these applications. With the GPU-based framework, the developer writes their code using the simple and familiar MapReduce interfaces. The runtime on the GPU is completely hidden from the developer by the framework.
Google’s runtime implementation targets large clusters of Linux PCs connected through Ethernet switches [3]. Tasks are forked using remote procedure calls. Buffering and
communication occurs by reading and writing files on a distributed file system [12]. The locality optimizations focus mostly on avoiding remote file accesses. While such a system is effective with distributed computing [8], it leads to very high overheads if used with shared-memory systems that facilitate communication through memory and are typically of much smaller scale.  


== References ==
= References =
1. [http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/archive/mapreduce-osdi04.pdf]
<span id="1foot">[[#1body|1.]]</span>  [http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/archive/mapreduce-osdi04.pdfMapReduce Simplified Data Processing on Large Clusters. Sanjay Ghemawat, Jeffrey Dean ] <br>
2. [http://dl.acm.org/citation.cfm?id=1318097]
<span id="1foot">[[#1body|2.]]</span> [http://www.ece.rutgers.edu/~parashar/Classes/08-09/ece572/readings/mars-pact-08.pdf Mars: A MapReduce Framework on Graphics Processors. Bingsheng He, Wenbin Fang, Qiong Luo, Naga K. Govindaraju, Tuyong Wang] <br>
3. [http://dl.acm.org/citation.cfm?id=1454152]
<span id="1foot">[[#1body|3.]]</span> [http://csl.stanford.edu/~christos/publications/2007.cmp_mapreduce.hpca.pdf Evaluating MapReduce for Multi-core and Multiprocessor Systems. Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis] <br>
<span id="1foot">[[#1body|4.]]</span> [http://hadoop.apache.org/mapreduce/ Hadoop MapReduce] <br>
<span id="1foot">[[#1body|5.]]</span> [http://code.google.com/edu/parallel/mapreduce-tutorial.html Google MapReduce] <br>
<span id="1foot">[[#1body|6.]]</span> [http://csl.stanford.edu/~christos/publications/2011.phoenixplus.mapreduce.pdf Phoenix++: Modular MapReduce for Shared Memory Systems. Justin Talbot, Richard M.Yoo Christos Kozyrakis ] <br>

Latest revision as of 01:02, 21 February 2012

Introduction

MapReduce is a software framework introduced by Google in 2004 to support distributed computing on large data sets on clusters of computers. MapReduce programming model consists of two major steps: map and reduce. In the map step, the problem being solved is divided into a series of sub-problems and distributed to different workers. After collecting results from workers, the computation enters the reduce step to combine and produce the final result.

Programming Model

MapReduce for a Shape Counter


The MapReduce programming model is inspired by functional languages and targets data-intensive computations. The input data format is application-specific, and is specified by the user. The output is a set of <key,value> pairs. The user expresses an algorithm using two functions, Map and Reduce. The Map function is applied on the input data and produces a list of intermediate <key,value> pairs. The Reduce function is applied to all intermediate pairs with the same key. It typically performs some kind of merging operation and produces zero or more output pairs. Finally, the output pairs are sorted by their key value. In the simplest form of MapReduce programs, the programmer provides just the Map function. All other functionality, including the grouping of the intermediate pairs which have the same key and the final sorting, is provided by the runtime. The main benefit of this model is simplicity. The programmer provides a simple description of the algorithm that focuses on functionality and not on parallelization. The actual parallelization and the details of concurrency management are left to the runtime system. Hence the program code is generic and easily portable across systems. Nevertheless, the model provides sufficient high-level information for parallelization. The Map function can be executed in parallel on non-overlapping portions of the input data and the Reduce function can be executed in parallel on each set of intermediate pairs with the same key. Similarly, since it is explicitly known which pairs each function will operate upon, one can employ pre-fetching or other scheduling optimizations for locality.

Examples

Below are a few simple examples of programs that can be easily expressed as MapReduce computations.

  • Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output.
  • Count of URL Access Frequency: The map function processes logs of web page requests and outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a <URL, total count> pair.
  • Reverse Web-Link Graph: The map function outputs <target, source> pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: <target, list(source)>.
  • Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of <word, frequency> pairs. The map function emits a <hostname, term vector> pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final <hostname, term vector> pair.
  • Inverted Index: The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions.

Sample Code

The following pseudocode shows the basic structure of a MapReduce program that counts the number of occurences of each word in a collection of documents.

//Input : a Document
//Intermediate Output: key = word, value = 1
Map(void * input){
   for each word w in Input
       Emit Intermediate(w,1)
}

//Intermediate Output key = word, value = 1
//Output : key = word, value = occurrences
Reduce(String key, Iterator values){
   int result = 0;
   for each v in values 
       result += v
   Emit(w, result)
}

Runtime System

The MapReduce runtime is responsible for parallelization and concurrency control. To parallelize the Map function, it splits the input pairs into units that are processed concurrently on multiple nodes. Next, the runtime partitions the intermediate pairs using a scheme that keeps pairs with the same key in the same unit. The partitions are processed in parallel by Reduce tasks running on multiple nodes. In both steps, the runtime must decide on factors such as the size of the units, the number of nodes involved, how units are assigned to nodes dynamically, and how buffer space is allocated. The decisions can be fully automatic or guided by the programmer given application specific knowledge. These decisions allow the runtime to execute a program efficiently across a wide range of machines and dataset scenarios without modifications to the source code. Finally, the runtime must merge and sort the output pairs from all Reduce tasks.

Implementations

Many different implementations of the MapReduce interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines. Phoenix implements MapReduce for shared-memory systems. Hadoop and Google's MapReduce implement map reduce for large clusters of commodity PCs connected together with switched Ethernet. Mars is a MapReduce framework on graphics processors (GPUs).

Google's MapReduce

Google's MapReduce implements MapReduce for large clusters of commodity PCs connected together with switched Ethernet.

Execution Overview

The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user.


The figure above shows the overall flow of a MapReduce operation in Google's implementation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in the figure above correspond to the numbers in the list below):

1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.

2. One of the copies of the program is special . The master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.

6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code. After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file . they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files.

Master Data Structures

The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks). The master is the conduit through which the location of intermediate file regions is propagated from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks.

Fault Tolerance

Since the MapReduce library is designed to help process very large amounts of data using hundreds or thousands of machines, the library must tolerate machine failures gracefully.

Worker Failure

The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system. When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution. Any reduce task that has not already read the data from worker A will read the data from worker B. MapReduce is resilient to large-scale worker failures.

Master Failure

It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore Google's current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.

Phoenix

Phoenix implements MapReduce for shared-memory systems. Its goal is to support efficient execution on multiple cores without burdening the programmer with concurrency management. Phoenix consists of a simple API that is visible to application programmers and an efficient runtime that handles parallelization, resource management, and fault recovery.

Phoenix API

The current Phoenix implementation provides an application-programmer interface (API) for C and C++.The first set is provided by Phoenix and is used by the programmer’s application code to initialize the system and emit output pairs (1 required and 2 optional functions). The second set includes the functions that the programmer defines (3 required and 2 optional functions). Apart from the Map and Reduce functions, the user provides functions that partition the data before each step and a function that implements key comparison. The API is type agnostic. The function arguments are declared as void pointers wherever possible to provide flexibility in their declaration and fast use without conversion overhead. The data structure used to communicate basic function information and buffer allocation between the user code and runtime is of type scheduler_args_t. There are additional data structure types to facilitate communication between the Splitter, Map, Partition, and Reduce functions. These types use pointers whenever possible to implement communication without actually copying significant amounts of data. The Phoenix API does not rely on any specific compiler options and does not require a parallelizing compiler. However, it assumes that its functions can freely use stack-allocated and heap-allocated structures for private data. It also assumes that there is no communication through shared-memory structures other than the input/output buffers for these functions. For C/C++, these assumptions cannot be checked statically for arbitrary programs. Although there are stringent checks within the system to ensure valid data are communicated between user and runtime code, eventually it is the task of user to provide functionally correct code.


The Phoenix runtime was developed on top of Pthreads, but can be easily ported to other shared memory thread packages. The figure above shows the basic data flow for the runtime system. The runtime is controlled by the scheduler, which is initiated by user code. The scheduler creates and manages the threads that run all Map and Reduce tasks. It also manages the buffers used for task communication. The programmer provides the scheduler with all the required data and function pointers through the scheduler_args_t structure. After initialization, the scheduler determines the number of cores to use for this computation. For each core, it spawns a worker thread that is dynamically assigned some number of Map and Reduce tasks. To start the Map stage, the scheduler uses the Splitter to divide input pairs into equally sized units to be processed by the Map tasks. The Splitter is called once per Map task and returns a pointer to the data the Map task will process. The Map tasks are allocated dynamically to workers and each one emits intermediate <key,value> pairs. The Partition function splits the intermediate pairs into units for the Reduce tasks. The function ensures all values of the same key go to the same unit. Within each buffer, values are ordered by key to assist with the final sorting. At this point, the Map stage is over. The scheduler must wait for all Map tasks to complete before initiating the Reduce stage. Reduce tasks are also assigned to workers dynamically, similar to Map tasks. The one difference is that, while with Map tasks there is complete freedom in distributing pairs across tasks, with Reduce all values for the same key must be processed in one task. Hence, the Reduce stage may exhibit higher imbalance across workers and dynamic scheduling is more important. The output of each Reduce task is already sorted by key. As the last step, the final output from all tasks is merged into a single buffer, sorted by keys.

Buffer Management

Two types of temporary buffers are necessary to store data between the various stages. All buffers are allocated in shared memory but are accessed in a well specified way by a few functions. To re-arrange buffers (e.g., split across tasks), pointer manipulation is done instead of the actual pairs, which may be large in size. The intermediate buffers are not directly visible to user code. Map-Reduce buffers are used to store the intermediate output pairs. Each worker has its own set of buffers. The buffers are initially sized to a default value and then resized dynamically as needed. At this stage, there may be multiple pairs with the same key. To accelerate the Partition function, the Emit intermediate function stores all values for the same key in the same buffer. At the end of the Map task, each buffer is sorted by key order. Reduce- Merge buffers are used to store the outputs of Reduce tasks before they are sorted. At this stage, each key has only one value associated with it. After sorting, the final output is available in the user allocated Output data buffer.

Map Reduce on Graphics Processors

Compared with CPUs, the hardware architecture of GPUs differs significantly. For instance, current GPUs have over one hundred SIMD (Single Instruction Multiple Data) processors whereas current multi-core CPUs offer a much smaller number of cores. Moreover, most GPUs do not support atomic operations or locks. Due to the architectural differences, there are following three technical challenges in implementing the MapReduce framework on the GPU. First, the synchronization overhead in the runtime system of the framework must be low so that the system can scale to hundreds of processors. Second, due to the lack of dynamic thread scheduling on current GPUs, it is essential to allocate work evenly across threads on the GPU to exploit its massive thread parallelism. Third, the core tasks of MapReduce programs, including string processing, file manipulation and concurrent reads and writes, are unconventional to GPUs and must be handled efficiently.

Mars, MapReduce framework on the GPU was designed and implemented with these challenges in mind. Mars provides a small set of APIs that are similar to those of CPU-based MapReduce. Runtime system utilizes a large number of GPU threads for Map or Reduce tasks, and automatically assigns each thread a small number of key/value pairs to work on. As a result, the massive thread parallelism on the GPU is well utilized. To avoid any conflict between concurrent writes, Mars has a lock-free scheme with low runtime overhead on the massive thread parallelism of the GPU. This scheme guarantees the correctness of parallel execution with little synchronization overhead.

Mars API

Mars provides a small set of APIs. Similar to the existing MapReduce frameworks, Mars has two kinds of APIs, the user-implemented APIs, which the users implement, and the system-provided APIs, which the users can use as library calls. Mars has the following user-implemented APIs. These APIs are implemented with C/C++. void* type has been used so that the developer can manipulate strings and other complex data types conveniently.

//MAP_COUNT counts result size of the map function.
voidMAP_COUNT(void *key, void *val, int keySize, int valSize);
//The map function.
voidMAP(void *key, void* val, int keySize, int valSize);
//REDUCE_COUNT counts result size of the reduce function.
void REDUCE_COUNT(void* key, void* vals, int keySize, int valCount);
//The reduce function.
void REDUCE(void* key, void* vals, int keySize, int valCount);

Mars has the following four system-provided APIs. The emit functions are used in user-implemented map and reduce functions to output the intermediate/final results.

//Emit the key size and the value size inMAP_COUNT.
void EMIT_INTERMEDIATE_COUNT(int keySize, int valSize);
//Emit an intermediate result in MAP.
void EMIT_INTERMEDIATE(void* key, void* val, int keySize, int valSize);
//Emit the key size and the value size in REDUCE_COUNT.
void EMIT_COUNT(int keySize, int valSize);
//Emit a final result in REDUCE.
void EMIT(void *key, void* val, int keySize, int valSize);

Overall, the APIs in Mars are similar to those in the existing MapReduce frameworks such as Hadoop and Phoenix. The major difference is that Mars needs two APIs to implement the functionality of each CPU-based API. One is to count the size of results, and the other one is to output the results. This is because the GPU does not support atomic operations, and the Mars runtime uses a two-step design for the result output.

Implementation Details

Since the GPU does not support dynamic memory allocation on the device memory during the execution of the GPU code, arrays are used as the main data structure. The input data, the intermediate result and the final result are stored in three kinds of arrays, i.e., the key array, the value array and the directory index. The directory index consists of an entry of <key offset, key size, value offset, value size> for each key/value pair. Given a directory index entry, the key or the value at the corresponding offset in the key array or the value array is fetched. With the array structure, for the input data as well as for the result output the space on the device memory is allocated before executing the GPU program. However, the sizes of the output from the map and the reduce stages are unknown.The output scheme for the map stage is similar to that for the reduce stage.

First, each map task outputs three counts, i.e., the number of intermediate results, the total size of keys (in bytes) and the total size of values (in bytes) generated by the map task. Based on key sizes (or value sizes) of all map tasks, the runtime system computes a prefix sum on these sizes and produces an array of write locations. A write location is the start location in the output array for the corresponding map task to write. Based on the number of intermediate results, the runtime system computes a prefix sum and produces an array of start locations in the output directory index for the corresponding map task. Through these prefix sums, the sizes of the arrays for the intermediate result is also known. Thus, the runtime allocates arrays in the device memory with the exact size for storing the intermediate results.

Second, each map task outputs the intermediate key/value pairs to the output array and updates the directory index. Since each map has its deterministic and non-overlapping positions to write to, the write conflicts are avoided. This two-step scheme does not require the hardware support of atomic functions. It is suitable for the massive thread parallelism on the GPU. However, it doubles the map computation in the worst case. The overhead of this scheme is application dependent, and is usually much smaller than that in the worst case.

Optimization Techniques

Memory Optimizations

Two memory optimizations are used to reduce the number of memory requests in order to improve the memory bandwidth utilization.

Coalesced accesses

The GPU feature of coalesced accesses is utilized to improve the memory performance. The memory accesses of each thread to the data arrays are designed according to the coalesced access pattern when applicable. Suppose there are T threads in total and the number of key/value pairs is N in the map stage. Thread i processes the (i + T • k )th (k=0,..,N/T) key/value pair. Due to the SIMD property of the GPU, the memory addresses from the threads within a thread group are consecutive and these accesses are coalesced into one. The figure below illustrates the map stage with and without the coalesced access optimization.

Accesses using built-in vector types

Accessing the values in the device memory can be costly, because the data values are often of different sizes and the accesses are hardly coalesced. Fortunately, GPUs such as G80 support built-in vector types such as char4 and int4. Reading built-in vectors fetches the entire vector in a single memory request. Compared with reading char or int, the number of memory requests is greatly reduced and the memory performance is improved.

Thread parallelism

The thread configuration, i.e., the number of thread groups and the number of threads per thread group, is related to multiple factors including, (1) the hardware configuration such as the number of multiprocessors and the on-chip computation resources such as the number of registers on each multiprocessor, (2) the computation characteristics of the map and the reduce tasks, e.g., they are memory- or computation-intensive. Since the map and the reduce functions are implemented by the developer, and their costs are unknown to the runtime system, it is difficult to find the optimal setting for the thread configuration at run time.

Handling variable-sized types

The variable-sized types are supported with the directory index. If two key/value pairs need to be swapped, their corresponding entries in the directory index are swapped without modifying the key and the value arrays. This choice is to save the swapping cost since the directory entries are typically much smaller than the key/value pairs. Even though swapping changes the order of entries in the directory index, the array layout is preserved and therefore accesses to the directory index can still be coalesced after swaps. Since strings are a typical variable-sized type, and string processing is common in web data analysis tasks, a GPU-based string manipulation library was developed for Mars. The operations in the library include strcmp, strcat, memset and so on. The APIs of these operations are consistent with those in C/C++ library on the CPU. The difference is that simple algorithms for these GPU-based string operations were used, since they usually handle small strings within a map or a reduce task. In addition, char4 is used to implement strings to optimize the memory performance.

Hashing

Hashing is used in the sort algorithm to store the results with the same key value consecutively. In that case, it is not needed that the results with the key values are in their strict ascending/ decreasing order. The hashing technique that hashes a key into a 32-bit integer is used, and the records are sorted according to their hash values. When two records are compared, their hash values are compared first. Only when their hash values are the same, their keys are fetched and compared. Given a good hash function, the probability of comparing the keys is low.

File manipulation

Currently, the GPU cannot directly access the data in the hard disk. Thus, the file manipulation with the assistance of the CPU is performed in three phases. First, the file I/O on the CPU is performed and the file data is loaded into a buffer in the main memory. To reduce the I/O stall, multiple threads are used to perform the I/O task. Second, the preprocessing on the buffered data is performed and the input key/value pairs are obtained. Finally, the input key/value pairs are copied to the GPU device memory.

Summary

Google’s MapReduce runtime implementation targets large clusters of Linux PCs connected through Ethernet switches. Tasks are forked using remote procedure calls. Buffering and communication occurs by reading and writing files on a distributed file system. The locality optimizations focus mostly on avoiding remote file accesses. While such a system is effective with distributed computing, it leads to very high overheads if used with shared-memory systems that facilitate communication through memory and are typically of much smaller scale.

Phoenix, implementation of MapReduce uses shared memory and minimizes the overheads of task spawning and data communication. With Phoenix,the programmer can provide a simple, functional expression of the algorithm and leaves parallelization and scheduling to the runtime system.Phoenix leads to scalable performance for both multi-core chips and conventional symmetric multiprocessors. Phoenix automatically handles key scheduling decisions during parallel execution. Despite runtime overheads, results have shown that performance of Phoenix to that of parallel code written in P-threads API are almost similar. Nevertheless,there are also applications that do not fit naturally in the MapReduce model for which P-threads code performs significantly better.

Graphics processors have emerged as a commodity platform for parallel computing. However, the developer requires the knowledge of the GPU architecture and much effort in developing GPU applications. Such difficulty is even more for complex and performance centric tasks such as web data analysis. Since MapReduce has been successful in easing the development of web data analysis tasks, one can use a GPU-based MapReduce for these applications. With the GPU-based framework, the developer writes their code using the simple and familiar MapReduce interfaces. The runtime on the GPU is completely hidden from the developer by the framework.

References

1. Simplified Data Processing on Large Clusters. Sanjay Ghemawat, Jeffrey Dean
2. Mars: A MapReduce Framework on Graphics Processors. Bingsheng He, Wenbin Fang, Qiong Luo, Naga K. Govindaraju, Tuyong Wang
3. Evaluating MapReduce for Multi-core and Multiprocessor Systems. Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis
4. Hadoop MapReduce
5. Google MapReduce
6. Phoenix++: Modular MapReduce for Shared Memory Systems. Justin Talbot, Richard M.Yoo Christos Kozyrakis