Contributing to Apache Spark Part 1/4

Contributing to Apache Spark: Implementing Mode with TreeMap, Part 1/4

Introduction

In this four-part series, I will share my journey contributing to Apache Spark, specifically focusing on the implementation of the ‘Mode’ function. This first post will detail the initial prototype using a scala.collection.mutable.TreeMap.

Collations and Their Importance

Collations are systems for arranging data, often considering cultural and linguistic nuances. For example, in the yellow pages, the collations used varied, 

1. some treated surnames beginning with mac and Mc without special treatment. 

2. Others treated Mc as if it were Mac (interleaving the two) but kept Mac/Mc in the M section (after Mabrey)

3. And many placed Mac and Mc before the rest of the M-section, as either two separate sections or interleaved.

Fundamental Collations

The fundamental groups of collations for basic database support are:

  1. UNICODE COLLATION ALGORITHM (UCA) which is not code point order.
  2. UTF8 Byte order, which is also not code point order. 

Additional fundamental collations include:

  1. Case-insensitive, and/or accent-insensitive, and/or LOCALE specification, applied to UCA
  2. Case-insensitive UTF8 Byte order

Background: Alphabetical Order

In mathematics and linguistics, alphabets are sets of letters. In common usage, an alphabet implies an ordered list due to alphabetical order. The Proto-Sinaitic script (c. 19th–16th century BCE), evolved into the Phoenician alphabet, which is the ancestor of all alphabetical languages. It is not clear to what extent alphabetical ordering of library scrolls were ever used, prior to the Greeks. But the alphabet and its order was known and widely practiced by scribes. 

Byte Order and Roman Numerals

Automated collation often relies on numerical codes of symbols, such as ASCII or Unicode, ordering characters by increasing numerical codes and extending this to strings lexicographically. For instance, a program might sort characters $, C, a, b, d as $ (36), C (67), a (97), b (98), d (100), deviating from standard alphabetical order with capitals before lowercase.

In the case of ascii characters, byte order, code point order, and UCA order are all agreed. However this might not always be the case:

While none of the collations are by code point, it is interesting to look at Byte Order Collation vs. Lexicographic Order of Code Points

Byte Order Collation:

Characters are compared byte by byte according to their UTF-8 encoding.

The order is determined by the binary values of the bytes.

Lexicographic Order of Code Points:

Characters are compared by their Unicode code points.

The order is determined by the numerical values of the code points.


Example String Comparison:


Consider the strings “Aßa” and “Aaß”:

“A” has the code point U+0041 and is encoded as 0x41 in UTF-8.

“ß” (sharp S) has the code point U+00DF and is encoded as 0xC3 0x9F in UTF-8.

“a” has the code point U+0061 and is encoded as 0x61 in UTF-8.


String 1: “Aßa”

UTF-8 encoding: 0x41 0xC3 0x9F 0x61


String 2: “Aaß”

UTF-8 encoding: 0x41 0x61 0xC3 0x9F


Byte Order Collation:

1. Compare the first byte of each string:

Both start with 0x41 (“A”), so they are equal.

2. Compare the second byte:

String 1: 0xC3

String 2: 0x61

0xC3 (195 in decimal) > 0x61 (97 in decimal)


Since 0xC3 > 0x61, in byte order collation, “Aßa” > “Aaß”.


Lexicographic Order of Code Points:

1. Compare the first character of each string:

Both are “A” (U+0041), so they are equal.

2. Compare the second character:

String 1: “ß” (U+00DF)

String 2: “a” (U+0061)

U+00DF (223 in decimal) > U+0061 (97 in decimal)


Since U+00DF > U+0061, in lexicographic order of code points, “Aßa” > “Aaß”.

Unicode Collation Algorithm

The Unicode Collation Algorithm is a standard for collating strings composed of Unicode symbols. It can be tailored for specific languages by adjusting its default collation table, with several tailorings collected in the Common Locale Data Repository.

In some applications, collation strings differ from displayed identifiers. For instance, "The Shining" might be sorted as "Shining, The." These collation strings, called sort keys, will be explored further in parts 3 and 4. Common word rules such as “The” moving to the end of the movie title is out of scope of UCA or any collations used by databases, though this can be accomplished by a column separate from “displayed_movie_title”.

TreeMap in Mode Function Implementation

The choice to prototype the Mode function using a TreeMap stemmed from its ability to maintain elements in sorted order according to a comparator, essential when collation settings affect string data equality. TreeMap uses a Red-Black Tree (RB Tree), ensuring balanced operations with logarithmic time complexity for insertions, deletions, and searches.

Technical Discussion: TreeMap and Mode Function

TreeMap was considered for several reasons:

  • Support for Ordered Comparisons: TreeMap's comparator determines equality, supporting efficient counts by keys equal under the comparator but not by default physical equality used by hash maps.

Code Example

Here's how TreeMap was utilized in the proposed Mode function:

scala
Ordering.comparatorToOrdering(CollationFactory.fetchCollation(collationId).comparator)) { case (map, (key: String, count)) => map(org.apache.spark.unsafe.types.UTF8String.fromString(key)) = map.getOrElse(org.apache.spark.unsafe.types.UTF8String.fromString(key), 0L) + count }

After which the map was sorted by count and then lexicographically, and the head of the list was chosen. 

Reflections on Using TreeMap

Benchmarks revealed high overhead of constructing a new data structure, or at least this one. 

Some of the practical challenges that concerned me about this approach still concern me: handling high cardinality datasets and the absence of parallelized bulk operations. 

Example Use-Case

Consider the following SQL query, which demonstrates the practical implications of collation support:

sql
SELECT mode(col) FROM VALUES ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), ('b') AS tab(col);

With UTF8_BINARY collation, the query should return 'a', reflecting a simple binary comparison. However, with UTF8_BINARY_LCASE collation, the result might be 'B' or 'b', depending on how the system interprets lowercase and uppercase letters in sorting and comparison.

Data Structures Already Used By Mode

The TypedImperativeAggregate interface extends ImperativeAggregate functionality by allowing the aggregation buffer to be a user-defined Java object. This flexibility enables custom aggregation functions tailored to specific data types.

TypedImperativeAggregate can optimize performance based on data characteristics, significantly reducing JVM boxing and unboxing overhead. We will discuss the memory efficiency of using OpenHashMap.

The way these structures are used support distributed computing environments, with efficient serialization and deserialization, suitable for horizontal scaling. The parallel model entails three functions: eval and merge. Put simply and archaically, Eval is to Map what Merge is to Reduce. 

The TypedAggregateWithHashMapAsBuffer implementation of TypedImperativeAggregate maintains a map from keys to longs, representing counts in current use-cases. Ideally, a generic TypedAggregateWithHashMapAsBuffer with a generic HashMap would be used, but in this case, generics are defined as AnyRef mapping to a Long. The OpenHashMap data structure is chosen for its efficiency with mutable keys and values, allowing rapid access and update operations essential for aggregation tasks.

Operations and Relevance

Space Complexity:

  • Space: O(n)

Time Complexity:

  • Search: O(log n) (amortized and worst-case)
  • Insert: O(log n) (amortized and worst-case)
  • Delete: O(log n) (amortized and worst-case)

Deletion: Not used in this context.

Searching: Relevant, especially since each insertion involves a getOrElse operation, leading to n searches for n collation-unaware keys. For example, if the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then n = 3, resulting in n log n total searches.

Insertion: Relevant, as each insertion operation requires rebalancing the tree, involving multiple rotations and color changes. While these operations are logarithmic in time complexity, the constants involved can be significant, especially with large datasets. For this implementation, there are m rebalancing insertions, with k being the number of collation-aware keys. If the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then k = 2. Using collation keys and a hash-based data structure would make k linearly related to m, where m is the number of elements with colliding hashcodes.

Bulk Insertion: Would be relevant if supported by TreeMap. However, TreeMap lacks a dedicated bulk insertion method, which could be seen as an oversight. Bulk operations could potentially be optimized to reduce the number of rebalancings. TreeMap in Scala does not have a built-in method specifically optimized for bulk insertions. Instead, it relies on individual insertions, leading to multiple rebalancings when adding elements one by one.

Parallel algorithms for constructing red-black trees from sorted lists of items can run in constant time or O(log log n) time, depending on the computer model and the number of processors available. However, since sorting is O(n log n), we do not asymptotically benefit from a bulk insert requiring sorted order.

Throughout this process, I considered that sometimes the cardinality of elements for the mode is low, even in large datasets. For example, a 20 TB dataframe with billions of rows might have a trivial TreeMap size by the time Spark calls Mode.eval(). However, high cardinality cases illustrate that constructing a TreeMap with billions of keys can consume significant space and time, potentially overloading the master node and lacking parallelization.

Ultimately, it is not scalable to perform all this work inside the eval function on the master node, as the data structure is not serialized or deserialized, and cannot be shuffled or persisted. The next step would involve changing the Mode function to rely on a new implementation of TypedImperativeAggregate using a TreeMap or another comparison-based map, rather than a HashMap.

Next Steps and Preview

In the upcoming post, will be about modifying the OpenHashMap and OpenHashSet, the second approach we tried. 


Further Reading:

Unicode Collation: https://www.unicode.org/reports/tr10/#Common_Misperceptions


Fixing an Incorrectly Resolved Git Rebase Conflict After a Force Push

Scenario You resolved a rebase conflict, but did it wrong. You’ve already committed and force-pushed the branch to your remote (e.g.,  origi...