Contributing to Apache Spark: Implementing Mode with TreeMap, Part 1/4
Introduction
In this four-part series, I will share my journey contributing to Apache Spark, specifically focusing on the implementation of the ‘Mode’ function. This first post will detail the initial prototype using a scala.collection.mutable.TreeMap
.
Collations and Their Importance
Collations are systems for arranging data, often considering cultural and linguistic nuances. For example, in the yellow pages, the collations used varied,
1. some treated surnames beginning with mac and Mc without special treatment.
2. Others treated Mc as if it were Mac (interleaving the two) but kept Mac/Mc in the M section (after Mabrey)
3. And many placed Mac and Mc before the rest of the M-section, as either two separate sections or interleaved.
Fundamental Collations
The fundamental groups of collations for basic database support are:
- UNICODE COLLATION ALGORITHM (UCA) which is not code point order.
- UTF8 Byte order, which is also not code point order.
Additional fundamental collations include:
- Case-insensitive, and/or accent-insensitive, and/or LOCALE specification, applied to UCA
- Case-insensitive UTF8 Byte order
Background: Alphabetical Order
In mathematics and linguistics, alphabets are sets of letters. In common usage, an alphabet implies an ordered list due to alphabetical order. The Proto-Sinaitic script (c. 19th–16th century BCE), evolved into the Phoenician alphabet, which is the ancestor of all alphabetical languages. It is not clear to what extent alphabetical ordering of library scrolls were ever used, prior to the Greeks. But the alphabet and its order was known and widely practiced by scribes.
Byte Order and Roman Numerals
Automated collation often relies on numerical codes of symbols, such as ASCII or Unicode, ordering characters by increasing numerical codes and extending this to strings lexicographically. For instance, a program might sort characters $, C, a, b, d as $ (36), C (67), a (97), b (98), d (100), deviating from standard alphabetical order with capitals before lowercase.
In the case of ascii characters, byte order, code point order, and UCA order are all agreed. However this might not always be the case:
While none of the collations are by code point, it is interesting to look at Byte Order Collation vs. Lexicographic Order of Code Points
Byte Order Collation:
• Characters are compared byte by byte according to their UTF-8 encoding.
• The order is determined by the binary values of the bytes.
Lexicographic Order of Code Points:
• Characters are compared by their Unicode code points.
• The order is determined by the numerical values of the code points.
Example String Comparison:
Consider the strings “Aßa” and “Aaß”:
• “A” has the code point U+0041 and is encoded as 0x41 in UTF-8.
• “ß” (sharp S) has the code point U+00DF and is encoded as 0xC3 0x9F in UTF-8.
• “a” has the code point U+0061 and is encoded as 0x61 in UTF-8.
String 1: “Aßa”
• UTF-8 encoding: 0x41 0xC3 0x9F 0x61
String 2: “Aaß”
• UTF-8 encoding: 0x41 0x61 0xC3 0x9F
Byte Order Collation:
1. Compare the first byte of each string:
• Both start with 0x41 (“A”), so they are equal.
2. Compare the second byte:
• String 1: 0xC3
• String 2: 0x61
• 0xC3 (195 in decimal) > 0x61 (97 in decimal)
Since 0xC3 > 0x61, in byte order collation, “Aßa” > “Aaß”.
Lexicographic Order of Code Points:
1. Compare the first character of each string:
• Both are “A” (U+0041), so they are equal.
2. Compare the second character:
• String 1: “ß” (U+00DF)
• String 2: “a” (U+0061)
• U+00DF (223 in decimal) > U+0061 (97 in decimal)
Since U+00DF > U+0061, in lexicographic order of code points, “Aßa” > “Aaß”.
Unicode Collation Algorithm
The Unicode Collation Algorithm is a standard for collating strings composed of Unicode symbols. It can be tailored for specific languages by adjusting its default collation table, with several tailorings collected in the Common Locale Data Repository.
In some applications, collation strings differ from displayed identifiers. For instance, "The Shining" might be sorted as "Shining, The." These collation strings, called sort keys, will be explored further in parts 3 and 4. Common word rules such as “The” moving to the end of the movie title is out of scope of UCA or any collations used by databases, though this can be accomplished by a column separate from “displayed_movie_title”.
TreeMap in Mode Function Implementation
The choice to prototype the Mode function using a TreeMap stemmed from its ability to maintain elements in sorted order according to a comparator, essential when collation settings affect string data equality. TreeMap uses a Red-Black Tree (RB Tree), ensuring balanced operations with logarithmic time complexity for insertions, deletions, and searches.
Technical Discussion: TreeMap and Mode Function
TreeMap was considered for several reasons:
- Support for Ordered Comparisons: TreeMap's comparator determines equality, supporting efficient counts by keys equal under the comparator but not by default physical equality used by hash maps.
Code Example
Here's how TreeMap was utilized in the proposed Mode function:
scalaOrdering.comparatorToOrdering(CollationFactory.fetchCollation(collationId).comparator)) { case (map, (key: String, count)) => map(org.apache.spark.unsafe.types.UTF8String.fromString(key)) = map.getOrElse(org.apache.spark.unsafe.types.UTF8String.fromString(key), 0L) + count }
After which the map was sorted by count and then lexicographically, and the head of the list was chosen.
Reflections on Using TreeMap
Benchmarks revealed high overhead of constructing a new data structure, or at least this one.
Some of the practical challenges that concerned me about this approach still concern me: handling high cardinality datasets and the absence of parallelized bulk operations.
Example Use-Case
Consider the following SQL query, which demonstrates the practical implications of collation support:
sqlSELECT mode(col) FROM VALUES ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), ('b') AS tab(col);
With UTF8_BINARY
collation, the query should return 'a', reflecting a simple binary comparison. However, with UTF8_BINARY_LCASE
collation, the result might be 'B' or 'b', depending on how the system interprets lowercase and uppercase letters in sorting and comparison.
Data Structures Already Used By Mode
The TypedImperativeAggregate
interface extends ImperativeAggregate
functionality by allowing the aggregation buffer to be a user-defined Java object. This flexibility enables custom aggregation functions tailored to specific data types.
TypedImperativeAggregate
can optimize performance based on data characteristics, significantly reducing JVM boxing and unboxing overhead. We will discuss the memory efficiency of using OpenHashMap
.
The way these structures are used support distributed computing environments, with efficient serialization and deserialization, suitable for horizontal scaling. The parallel model entails three functions: eval and merge. Put simply and archaically, Eval is to Map what Merge is to Reduce.
The TypedAggregateWithHashMapAsBuffer
implementation of TypedImperativeAggregate
maintains a map from keys to longs, representing counts in current use-cases. Ideally, a generic TypedAggregateWithHashMapAsBuffer
with a generic HashMap would be used, but in this case, generics are defined as AnyRef
mapping to a Long
. The OpenHashMap
data structure is chosen for its efficiency with mutable keys and values, allowing rapid access and update operations essential for aggregation tasks.
Operations and Relevance
Space Complexity:
- Space: O(n)
Time Complexity:
- Search: O(log n) (amortized and worst-case)
- Insert: O(log n) (amortized and worst-case)
- Delete: O(log n) (amortized and worst-case)
Deletion: Not used in this context.
Searching: Relevant, especially since each insertion involves a getOrElse operation, leading to n searches for n collation-unaware keys. For example, if the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then n = 3, resulting in n log n total searches.
Insertion: Relevant, as each insertion operation requires rebalancing the tree, involving multiple rotations and color changes. While these operations are logarithmic in time complexity, the constants involved can be significant, especially with large datasets. For this implementation, there are m rebalancing insertions, with k being the number of collation-aware keys. If the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then k = 2. Using collation keys and a hash-based data structure would make k linearly related to m, where m is the number of elements with colliding hashcodes.
Bulk Insertion: Would be relevant if supported by TreeMap. However, TreeMap lacks a dedicated bulk insertion method, which could be seen as an oversight. Bulk operations could potentially be optimized to reduce the number of rebalancings. TreeMap in Scala does not have a built-in method specifically optimized for bulk insertions. Instead, it relies on individual insertions, leading to multiple rebalancings when adding elements one by one.
Parallel algorithms for constructing red-black trees from sorted lists of items can run in constant time or O(log log n) time, depending on the computer model and the number of processors available. However, since sorting is O(n log n), we do not asymptotically benefit from a bulk insert requiring sorted order.
Throughout this process, I considered that sometimes the cardinality of elements for the mode is low, even in large datasets. For example, a 20 TB dataframe with billions of rows might have a trivial TreeMap size by the time Spark calls Mode.eval(). However, high cardinality cases illustrate that constructing a TreeMap with billions of keys can consume significant space and time, potentially overloading the master node and lacking parallelization.
Ultimately, it is not scalable to perform all this work inside the eval function on the master node, as the data structure is not serialized or deserialized, and cannot be shuffled or persisted. The next step would involve changing the Mode function to rely on a new implementation of TypedImperativeAggregate using a TreeMap or another comparison-based map, rather than a HashMap.
Next Steps and Preview
In the upcoming post, will be about modifying the OpenHashMap and OpenHashSet, the second approach we tried.
Further Reading:
Unicode Collation: https://www.unicode.org/reports/tr10/#Common_Misperceptions