CSC/ECE 506 Spring 2012/3b sk: Difference between revisions
No edit summary |
|||
Line 9: | Line 9: | ||
= Examples = | = Examples = | ||
Below are a few simple examples of programs that can be easily expressed as MapReduce computations. | |||
Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. <br> | *Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. <br> | ||
Count of URL Access Frequency: The map function processes logs of web page requests and outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a <URL, total count> pair. <br> | *Count of URL Access Frequency: The map function processes logs of web page requests and outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a <URL, total count> pair. <br> | ||
Reverse Web-Link Graph: The map function outputs <target, source> pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: <target, list(source)>.<br> | *Reverse Web-Link Graph: The map function outputs <target, source> pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: <target, list(source)>.<br> | ||
Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of <word, frequency> pairs. The map function emits a <hostname, term vector> pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final <hostname, term vector> pair.<br> | *Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of <word, frequency> pairs. The map function emits a <hostname, term vector> pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final <hostname, term vector> pair.<br> | ||
Inverted Index: The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. [1] | *Inverted Index: The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. [1] | ||
= Sample Code = | = Sample Code = |
Revision as of 05:32, 20 February 2012
Introduction
MapReduce is a software framework introduced by Google in 2004 to support distributed computing on large data sets on clusters of computers. MapReduce programming model consists of two major steps: map and reduce. In the map step, the problem being solved is divided into a series of sub-problems and distributed to different workers. After collecting results from workers, the computation enters the reduce step to combine and produce the final result.
Programming Model
The MapReduce programming model is inspired by functional languages and targets data-intensive computations. The input data format is application-specific, and is specified by the user. The output is a set of <key,value> pairs. The user expresses an algorithm using two functions, Map and Reduce. The Map function is applied on the input data and produces a list of intermediate <key,value> pairs. The Reduce function is applied to all intermediate pairs with the same key. It typically performs some kind of merging operation and produces zero or more output pairs. Finally, the output pairs are sorted by their key value. In the simplest form of MapReduce programs, the programmer provides just the Map function. All other functionality, including the grouping of the intermediate pairs which have the same key and the final sorting, is provided by the runtime. The main benefit of this model is simplicity. The programmer provides a simple description of the algorithm that focuses on functionality and not on parallelization. The actual parallelization and the details of concurrency management are left to the runtime system. Hence the program code is generic and easily portable across systems. Nevertheless, the model provides sufficient high-level information for parallelization. The Map function can be executed in parallel on non-overlapping portions of the input data and the Reduce function can be executed in parallel on each set of intermediate pairs with the same key. Similarly, since it is explicitly known which pairs each function will operate upon, one can employ pre-fetching or other scheduling optimizations for locality.
Examples
Below are a few simple examples of programs that can be easily expressed as MapReduce computations.
- Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output.
- Count of URL Access Frequency: The map function processes logs of web page requests and outputs <URL, 1>. The reduce function adds together all values for the same URL and emits a <URL, total count> pair.
- Reverse Web-Link Graph: The map function outputs <target, source> pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: <target, list(source)>.
- Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of <word, frequency> pairs. The map function emits a <hostname, term vector> pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final <hostname, term vector> pair.
- Inverted Index: The map function parses each document, and emits a sequence of <word, document ID> pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list(document ID)> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. [1]
Sample Code
The following pseudocode shows the basic structure of a MapReduce program that counts the number of occurences of each word in a collection of documents.
//Input : a Document //Intermediate Output: key = word, value = 1 Map(void * input){ for each word w in Input Emit Intermediate(w,1) } //Intermediate Output key = word, value = 1 //Output : key = word, value = occurrences Reduce(String key, Iterator values){ int result = 0; for each v in values result += v Emit(w, result) }
Runtime System
The MapReduce runtime is responsible for parallelization and concurrency control. To parallelize the Map function, it splits the input pairs into units that are processed concurrently on multiple nodes. Next, the runtime partitions the intermediate pairs using a scheme that keeps pairs with the same key in the same unit. The partitions are processed in parallel by Reduce tasks running on multiple nodes. In both steps, the runtime must decide on factors such as the size of the units, the number of nodes involved, how units are assigned to nodes dynamically, and how buffer space is allocated. The decisions can be fully automatic or guided by the programmer given application specific knowledge. These decisions allow the runtime to execute a program efficiently across a wide range of machines and dataset scenarios without modifications to the source code. Finally, the runtime must merge and sort the output pairs from all Reduce tasks.
Implementations
Many different implementations of the MapReduce interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines. Phoenix implements MapReduce for shared-memory systems. Hadoop and Google's MapReduce implement map reduce for large clusters of commodity PCs connected together with switched Ethernet. Mars is a MapReduce framework on graphics processors (GPUs).
Google's MapReduce
Google's MapReduce implements MapReduce for large clusters of commodity PCs connected together with switched Ethernet.
Execution Overview
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and the partitioning function are specified by the user.
The figure above shows the overall flow of a MapReduce operation in Google's implementation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in the figure above correspond to the numbers in the list below):
1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.
2. One of the copies of the program is special . The master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.
3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.
4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.
6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code. After successful completion, the output of the mapreduce execution is available in the R output files (one per reduce task, with file names as specified by the user). Typically, users do not need to combine these R output files into one file . they often pass these files as input to another MapReduce call, or use them from another distributed application that is able to deal with input that is partitioned into multiple files.
Master Data Structures
The master keeps several data structures. For each map task and reduce task, it stores the state (idle, in-progress, or completed), and the identity of the worker machine (for non-idle tasks). The master is the conduit through which the location of intermediate file regions is propagated from map tasks to reduce tasks. Therefore, for each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are completed. The information is pushed incrementally to workers that have in-progress reduce tasks.
Fault Tolerance
Since the MapReduce library is designed to help process very large amounts of data using hundreds or thousands of machines, the library must tolerate machine failures gracefully.
Worker Failure
The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. Completed map tasks are re-executed on a failure because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global file system. When a map task is executed first by worker A and then later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution. Any reduce task that has not already read the data from worker A will read the data from worker B. MapReduce is resilient to large-scale worker failures.
Master Failure
It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore Google's current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.
Phoenix
Phoenix implements MapReduce for shared-memory systems. Its goal is to support efficient execution on multiple cores without burdening the programmer with concurrency management. Phoenix consists of a simple API that is visible to application programmers and an efficient runtime that handles parallelization, resource management, and fault recovery.
Phoenix API
The current Phoenix implementation provides an application-programmer interface (API) for C and C++.The first set is provided by Phoenix and is used by the programmer’s application code to initialize the system and emit output pairs (1 required and 2 optional functions). The second set includes the functions that the programmer defines (3 required and 2 optional functions). Apart from the Map and Reduce functions, the user provides functions that partition the data before each step and a function that implements key comparison. The API is type agnostic. The function arguments are declared as void pointers wherever possible to provide flexibility in their declaration and fast use without conversion overhead. The data structure used to communicate basic function information and buffer allocation between the user code and runtime is of type scheduler_args_t. There are additional data structure types to facilitate communication between the Splitter, Map, Partition, and Reduce functions. These types use pointers whenever possible to implement communication without actually copying significant amounts of data. The Phoenix API does not rely on any specific compiler options and does not require a parallelizing compiler. However, it assumes that its functions can freely use stack-allocated and heap-allocated structures for private data. It also assumes that there is no communication through shared-memory structures other than the input/output buffers for these functions. For C/C++, these assumptions cannot be checked statically for arbitrary programs. Although there are stringent checks within the system to ensure valid data are communicated between user and runtime code, eventually it is the task of user to provide functionally correct code.
The Phoenix runtime was developed on top of Pthreads, but can be easily ported to other shared memory thread packages. The figure above shows the basic data flow for the runtime system. The runtime is controlled by the scheduler, which is initiated by user code. The scheduler creates and manages the threads that run all Map and Reduce tasks. It also manages the buffers used for task communication. The programmer provides the scheduler with all the required data and function pointers through the scheduler_args_t structure. After initialization, the scheduler determines the number of cores to use for this computation. For each core, it spawns a worker thread that is dynamically assigned some number of Map and Reduce tasks. To start the Map stage, the scheduler uses the Splitter to divide input pairs into equally sized units to be processed by the Map tasks. The Splitter is called once per Map task and returns a pointer to the data the Map task will process. The Map tasks are allocated dynamically to workers and each one emits intermediate <key,value> pairs. The Partition function splits the intermediate pairs into units for the Reduce tasks. The function ensures all values of the same key go to the same unit. Within each buffer, values are ordered by key to assist with the final sorting. At this point, the Map stage is over. The scheduler must wait for all Map tasks to complete before initiating the Reduce stage. Reduce tasks are also assigned to workers dynamically, similar to Map tasks. The one difference is that, while with Map tasks there is complete freedom in distributing pairs across tasks, with Reduce all values for the same key must be processed in one task. Hence, the Reduce stage may exhibit higher imbalance across workers and dynamic scheduling is more important. The output of each Reduce task is already sorted by key. As the last step, the final output from all tasks is merged into a single buffer, sorted by keys.
Buffer Management
Two types of temporary buffers are necessary to store data between the various stages. All buffers are allocated in shared memory but are accessed in a well specified way by a few functions. To re-arrange buffers (e.g., split across tasks), pointer manipulation is done instead of the actual pairs, which may be large in size. The intermediate buffers are not directly visible to user code. Map-Reduce buffers are used to store the intermediate output pairs. Each worker has its own set of buffers. The buffers are initially sized to a default value and then resized dynamically as needed. At this stage, there may be multiple pairs with the same key. To accelerate the Partition function, the Emit intermediate function stores all values for the same key in the same buffer. At the end of the Map task, each buffer is sorted by key order. Reduce- Merge buffers are used to store the outputs of Reduce tasks before they are sorted. At this stage, each key has only one value associated with it. After sorting, the final output is available in the user allocated Output data buffer.
Map Reduce on Graphics Processors
Compared with CPUs, the hardware architecture of GPUs differs significantly. For instance, current GPUs have over one hundred SIMD (Single Instruction Multiple Data) processors whereas current multi-core CPUs offer a much smaller number of cores. Moreover, most GPUs do not support atomic operations or locks. Due to the architectural differences, there are following three technical challenges in implementing the MapReduce framework on the GPU. First, the synchronization overhead in the runtime system of the framework must be low so that the system can scale to hundreds of processors. Second, due to the lack of dynamic thread scheduling on current GPUs, it is essential to allocate work evenly across threads on the GPU to exploit its massive thread parallelism. Third, the core tasks of MapReduce programs, including string processing, file manipulation and concurrent reads and writes, are unconventional to GPUs and must be handled efficiently.
Mars, MapReduce framework on the GPU was designed and implemented with these challenges in mind. Mars provides a small set of APIs that are similar to those of CPU-based MapReduce. Runtime system utilizes a large number of GPU threads for Map or Reduce tasks, and automatically assigns each thread a small number of key/value pairs to work on. As a result, the massive thread parallelism on the GPU is well utilized. To avoid any conflict between concurrent writes, Mars has a lock-free scheme with low runtime overhead on the massive thread parallelism of the GPU. This scheme guarantees the correctness of parallel execution with little synchronization overhead.
Mars API
Mars provides a small set of APIs. Similar to the existing MapReduce frameworks, Mars has two kinds of APIs, the user-implemented APIs, which the users implement, and the system-provided APIs, which the users can use as library calls. Mars has the following user-implemented APIs. These APIs are implemented with C/C++. void* type has been used so that the developer can manipulate strings and other complex data types conveniently.
//MAP_COUNT counts result size of the map function. voidMAP_COUNT(void *key, void *val, int keySize, int valSize); //The map function. voidMAP(void *key, void* val, int keySize, int valSize); //REDUCE_COUNT counts result size of the reduce function. void REDUCE_COUNT(void* key, void* vals, int keySize, int valCount); //The reduce function. void REDUCE(void* key, void* vals, int keySize, int valCount);
Mars has the following four system-provided APIs. The emit functions are used in user-implemented map and reduce functions to output the intermediate/final results.
//Emit the key size and the value size inMAP_COUNT. void EMIT_INTERMEDIATE_COUNT(int keySize, int valSize); //Emit an intermediate result in MAP. void EMIT_INTERMEDIATE(void* key, void* val, int keySize, int valSize); //Emit the key size and the value size in REDUCE_COUNT. void EMIT_COUNT(int keySize, int valSize); //Emit a final result in REDUCE. void EMIT(void *key, void* val, int keySize, int valSize);
Overall, the APIs in Mars are similar to those in the existing MapReduce frameworks such as Hadoop and Phoenix. The major difference is that Mars needs two APIs to implement the functionality of each CPU-based API. One is to count the size of results, and the other one is to output the results. This is because the GPU does not support atomic operations, and the Mars runtime uses a two-step design for the result output.
Implementation Details
Since the GPU does not support dynamic memory allocation on the device memory during the execution of the GPU code, arrays are used as the main data structure. The input data, the intermediate result and the final result are stored in three kinds of arrays, i.e., the key array, the value array and the directory index. The directory index consists of an entry of <key offset, key size, value offset, value size> for each key/value pair. Given a directory index entry, the key or the value at the corresponding offset in the key array or the value array is fetched. With the array structure, for the input data as well as for the result output the space on the device memory is allocated before executing the GPU program. However, the sizes of the output from the map and the reduce stages are unknown.The output scheme for the map stage is similar to that for the reduce stage.
First, each map task outputs three counts, i.e., the number of intermediate results, the total size of keys (in bytes) and the total size of values (in bytes) generated by the map task. Based on key sizes (or value sizes) of all map tasks, the runtime system computes a prefix sum on these sizes and produces an array of write locations. A write location is the start location in the output array for the corresponding map task to write. Based on the number of intermediate results, the runtime system computes a prefix sum and produces an array of start locations in the output directory index for the corresponding map task. Through these prefix sums, the sizes of the arrays for the intermediate result is also known. Thus, the runtime allocates arrays in the device memory with the exact size for storing the intermediate results.
Second, each map task outputs the intermediate key/value pairs to the output array and updates the directory index. Since each map has its deterministic and non-overlapping positions to write to, the write conflicts are avoided. This two-step scheme does not require the hardware support of atomic functions. It is suitable for the massive thread parallelism on the GPU. However, it doubles the map computation in the worst case. The overhead of this scheme is application dependent, and is usually much smaller than that in the worst case.
Optimization Techniques
Memory Optimizations
Two memory optimizations are used to reduce the number of memory requests in order to improve the memory bandwidth utilization.
Coalesced accesses
The GPU feature of coalesced accesses is utilized to improve the memory performance. The memory accesses of each thread to the data arrays are designed according to the coalesced access pattern when applicable. Suppose there are T threads in total and the number of key/value pairs is N in the map stage. Thread i processes the (i + T • k )th (k=0,..,N/T) key/value pair. Due to the SIMD property of the GPU, the memory addresses from the threads within a thread group are consecutive and these accesses are coalesced into one. The figure below illustrates the map stage with and without the coalesced access optimization.
Accesses using built-in vector types
Accessing the values in the device memory can be costly, because the data values are often of different sizes and the accesses are hardly coalesced. Fortunately, GPUs such as G80 support built-in vector types such as char4 and int4. Reading built-in vectors fetches the entire vector in a single memory request. Compared with reading char or int, the number of memory requests is greatly reduced and the memory performance is improved.
Thread parallelism
The thread configuration, i.e., the number of thread groups and the number of threads per thread group, is related to multiple factors including, (1) the hardware configuration such as the number of multiprocessors and the on-chip computation resources such as the number of registers on each multiprocessor, (2) the computation characteristics of the map and the reduce tasks, e.g., they are memory- or computation-intensive. Since the map and the reduce functions are implemented by the developer, and their costs are unknown to the runtime system, it is difficult to find the optimal setting for the thread configuration at run time.
Handling variable-sized types
The variable-sized types are supported with the directory index. If two key/value pairs need to be swapped, their corresponding entries in the directory index are swapped without modifying the key and the value arrays. This choice is to save the swapping cost since the directory entries are typically much smaller than the key/value pairs. Even though swapping changes the order of entries in the directory index, the array layout is preserved and therefore accesses to the directory index can still be coalesced after swaps. Since strings are a typical variable-sized type, and string processing is common in web data analysis tasks, a GPU-based string manipulation library was developed for Mars. The operations in the library include strcmp, strcat, memset and so on. The APIs of these operations are consistent with those in C/C++ library on the CPU. The difference is that simple algorithms for these GPU-based string operations were used, since they usually handle small strings within a map or a reduce task. In addition, char4 is used to implement strings to optimize the memory performance.
Hashing
Hashing is used in the sort algorithm to store the results with the same key value consecutively. In that case, it is not needed that the results with the key values are in their strict ascending/ decreasing order. The hashing technique that hashes a key into a 32-bit integer is used, and the records are sorted according to their hash values. When two records are compared, their hash values are compared first. Only when their hash values are the same, their keys are fetched and compared. Given a good hash function, the probability of comparing the keys is low.
File manipulation
Currently, the GPU cannot directly access the data in the hard disk. Thus, the file manipulation with the assistance of the CPU is performed in three phases. First, the file I/O on the CPU is performed and the file data is loaded into a buffer in the main memory. To reduce the I/O stall, multiple threads are used to perform the I/O task. Second, the preprocessing on the buffered data is performed and the input key/value pairs are obtained. Finally, the input key/value pairs are copied to the GPU device memory.
Summary
Google’s runtime implementation targets large clusters of Linux PCs connected through Ethernet switches. Tasks are forked using remote procedure calls. Buffering and communication occurs by reading and writing files on a distributed file system. The locality optimizations focus mostly on avoiding remote file accesses. While such a system is effective with distributed computing, it leads to very high overheads if used with shared-memory systems that facilitate communication through memory and are typically of much smaller scale.
Phoenix is an implementation of MapReduce that uses shared memory in order to minimize the overheads of task spawning and data communication. With Phoenix,the programmer provides a simple, functional expression of the algorithm and leaves parallelization and scheduling to the runtime system.Phoenix leads to scalable performance for both multi-core chips and conventional symmetric multiprocessors. Phoenix automatically handles key scheduling decisions during parallel execution. Despite runtime overheads, results have shown that performance of Phoenix to that of parallel code written in P-threads API are almost similar. Nevertheless,there are also applications that do not fit naturally in the MapReduce model for which P-threads code performs significantly better.
References
1. http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/archive/mapreduce-osdi04.pdf
2. http://dl.acm.org/citation.cfm?id=1318097
3. http://dl.acm.org/citation.cfm?id=1454152
4. http://hadoop.apache.org/mapreduce/
5. http://code.google.com/edu/parallel/mapreduce-tutorial.html