Chapter 4. MapReduce For Skandium: Implementation details
29
4.2.3
Using a caching technique to resolve collisions
In order to resolve the problems described above, we implemented the scheme that is
illustrated in Figure 4.3. To resolve the collisions in the concurrentHashMap ,we added
a thread local cache before the shared data structure. The cache is implemented by two
arrays of the same size: The first array holds the keys and the second array holds the
list of values associated with each key. The key and its associated values are stored
at the same index in the two caches. Since the cache stores only a small subset of the
total pairs, more than one keys are expected to map on the same cache entry.
For each pair, the thread first checks if the corresponding key exists in the key
cache. The index where a key should be located in the cache is determined by the
key’s hash code. If the key exists in the cache at the specified index, the list of values
is updated locally in the values cache with no synchronization costs. If the cache entry
is empty, a new entry for the key is created in the key cache and a new list of values
at the corresponding index in the values cache. At the beginning, the list stores only
the pair’s value . If the cache entry is already occupied by another key, the associated
values should be written back in the hash map before we replace the entry with a list
for the new key. If the hashMap already holds an entry for that key, the list of values
in the cache entry are merged with those in the hash map and the resulted list is stored
back in the hash map. When the storing of the pairs finishes, there is a final stage where
all the list of values that are stored in the caches are merged with corresponding lists
in the hash map to create a consistent view of the stored values before the partitioning.
The partition muscle is the same as in the previous schemes.
4.2.4
Implementing Phoenix’s storing/partitioning scheme
Finally we implemented a scheme similar to the one that Phoenix uses for storing
and partitioning the intermediate key-value pairs. Similarly to the previous scheme, a
Skandium Map skeleton coordinates the storing of the pairs. However, in this case each
store muscle uses it own hashArray to store the intermediate pairs. The hashArrays
have a fixed number of 256 entries. Each entry of the hashArray contains a sorted
ArrayList of Combiner objects sorted by key. The hashArray is illustrated in 4.4
where we can see how the combiners are stored in the sorted lists and the internal
structure of a stored combiner that has as key the String ”Edinburgh”. This scheme
does not use one of the Java’s hash structures because want to have more control on
how the pairs are stored into it. As we will see next, this simplifies the partitioning
Chapter 4. MapReduce For Skandium: Implementation details
30
Figure 4.3: A caching technique to resolve collisions
process.
For each pair, the store thread first picks the ArrayList in the hashArray’s entry that
is determined by the key’s hash code. It then checks if a combiner with the same key
is already present in the list. If so , it adds the pairs value to the existing combiner,
otherwise it creates a new combiner object to hold the pair’s key and value and adds it
to the sorted list at the appropriate position.
Unlike the other three schemes where all the values with the same key are grouped
together when the storing ends, in this scheme the values can be scattered across two
or more combiners, as each store thread uses its own hashArray to store the pairs. This
means that we need an extra step after the store phase, to locate and merge different
combiners with the same key before passing them to the reducers as the reducer expects
a single combiner per key at its input. The complexity in this task is that we do not
know in which hashArrays identical combiner’s are present neither we know at which
index in the sorted ArrayLists they are stored. What we know though, is that combiners
with the same key should be stored at the same index in all hashArrays because all store
threads determine the index where the combiner will be stored by the key’s hash code.
The merging two or more combiners with the same key in a single object happens
in the join stage which is illustrated in Figure 4.5. The output of each store muscle is
the hashArray populated with the combiners. The hashArrays are then merged together
at the merge muscle to create create a hashMatrix which is further forwarder to the join
Chapter 4. MapReduce For Skandium: Implementation details
31
Figure 4.4: The hashArray that is used by each store thread to store the intermediate
pairs.
skeleton.
Figure 4.5: Phoenix’s storing/partitioning scheme
The split muscle of the skeleton partitions the matrix into chunks of columns which
are then processed by a join muscle. For each column, the join muscle merges the
sorted lists of each entry together to create a single, sorted by key list. The merging
phase for a single column is illustrated in Figure 4.6 for a word count application
where 4 hashArrays are used. If two combiners with the same keys are found during
the merge phase, they are merged into a single combiner and the values of the one is
Chapter 4. MapReduce For Skandium: Implementation details
32
copied to the list of values of the other.
Figure 4.6: The merge phase in the case of 4 rows. The merge for a single column of
the hash matrix is illustrated
When the mege phase ends, we have single ArrayList for each column where all
the stored Combiner objects have unique keys. The muscle creates a single Collection
out of the ArrayLists for all the columns that have neen assigned to it and sends the
collection to the merge muscle of the skeleton. The merge skeleton creates a single
collection out of the many it gets from the join muscles and forwards it to the last
Skandium Map skeleton in the pipeline which partitions the collection to the reducers.