Chapter 4. MapReduce For Skandium: Implementation details
26
one of the predefined muscles in the repository. The constructor creates a new Map
skeleton responsible for the Map stage in line 5. The skeleton is fleshed with the
user provided splitter and mapper as well as with a new Store muscle which is the
generic muscle responsible of storing of the intermediate pairs. Similarly in line 6,
a Map skeleton for the reduce stage is created and fleshed with the generic Partition
muscle along with the user provided reducer and the final merge muscle. The two Map
skeletons are then nested inside a Skandium Pipe skeleton in line 7.
This constructor is used in our simplest implementation that is described in the
following section. In more complex schemes, the MapReduce pipeline is a combina-
tion of more Skandium skeletons. We chose to present the constructor of the simplest
scheme here for purposes of clarity.
4.2
Implementation of the generic muscles
In the next sections, four different schemes that were considered for storing and par-
titioning the intermediate pairs are presented along with the advantages and disadvan-
tages of each approach. Detailed performance evaluation of each scheme will follow
on the next chapter.
4.2.1
An initial Approach
An initial approach for the implementation of the intermediate grouping and partition-
ing of the intermediate key-value pairs is illustrated in Figure 4.1. In this scheme, a
Java HashMap is used for storing the intermediate pairs. The HashMap object maps
the emitted keys to lists which hold all the values associated with the same key.
The output of each mapper is a collection of key-value pairs and is passed as input
to the store muscle which is a library-defined muscle that implements Skandium’s
merge interface. For each pair in the collections, the store muscle first checks if the
key is already present in the HashMap. If it is, the value of the pair is added to the list
of values of the corresponding entry of the HashMap. If the key is not present, a new
HashMap entry is created. The pair’s key become the key of the new HashMap entry,
and the entry’s value is a new list that at the beginning contains only the pair’s value. In
this way, the values that are associated with the same key are grouped together during
the storing phase in a single HashMap entry. This reduces the size of the HashMap and
also makes easier the partitioning of the pairs with the same key to the same reducer.
Chapter 4. MapReduce For Skandium: Implementation details
27
Figure 4.1: Storing and Partitioning the intermediate pairs: An initial approach
The Partition muscle generates Collections of Combiners, as many as the number
of the reducers. The muscle gets the entries of the HashMap and encapsulates the key
and the list of values at each entry into a Combiner object. The muscle then stores each
combiner to a collection, in such a way that all the collections store approximately the
same number of Combiners at the end. Each collection is then passed to a different
reducer.
A HashMap object was used in this scheme to store the intermediate pairs because
we have a single thread that accesses the data structure, and the HashMap performs
better in a single-threaded environment than other similar structures that map keys to
values like the HashTable or the ConcurrentHashMap.
The scheme that we described above is expected to perform well for workloads
where the number of the emitted pairs from the mappers is small. However, the single
store thread could easily become a sequential bottleneck in case where there is a high
level of parallelism in the map phase and each mapper produce a big number of keys.
4.2.2
Parallelizing the Store muscle
In order to resolve the sequential bottleneck in the store phase, we parallelized the store
muscle. The new scheme is illustrated in Figure 4.2. In this scheme, the merge muscle
does not directly store the pairs in the intermediate data structure, but creates a single
collection out of the many and passes the single collection to a separate Skandium
Chapter 4. MapReduce For Skandium: Implementation details
28
Map skeleton that orchestrates the parallel storing of the pairs. The number of store
muscles in the current implementation is equal to the number of the mappers. The
split muscle of the skeleton splits the single collection into multiple collections, one
for each store muscle. The store muscles work in parallel and store the pairs in the
intermediate data structure. We should note that the HashMap has been substituted
with a ConcurrentHashMap in this scheme, to allow multiple threads to access it at the
same time. The partition muscle is the same as in the previous scheme.
This scheme is expected to perform better than the previous scheme for workloads
that produce a big number of intermediate pairs. However, it is still problematic for
workloads that produce many duplicate keys: if we have only a small number of keys
that are repeated many times, then during the storing phase we will very frequently
have the case where two or more threads are attempting to update the list of values
associated with a key at the same time. As we need to lock the list before a thread
adds a value to it, the collisions on the same keys and synchronization costs on the
lists of values will soon degrade the performance of the application. Another problem
with this scheme is that it leads to many cache invalidations in case we have many
duplicate keys, since each thread will be constantly updating only few list of values
that are shared by all the threads.
Figure 4.2: The parallel store muscle