Contributing to Apache Spark Part 1/4

Contributing to Apache Spark: Implementing Mode with TreeMap, Part 1/4

Introduction

In this four-part series, I will share my journey contributing to Apache Spark, specifically focusing on the implementation of the ‘Mode’ function. This first post will detail the initial prototype using a scala.collection.mutable.TreeMap.

Collations and Their Importance

Collations are systems for arranging data, often considering cultural and linguistic nuances. For example, in the yellow pages, the collations used varied, 

1. some treated surnames beginning with mac and Mc without special treatment. 

2. Others treated Mc as if it were Mac (interleaving the two) but kept Mac/Mc in the M section (after Mabrey)

3. And many placed Mac and Mc before the rest of the M-section, as either two separate sections or interleaved.

Fundamental Collations

The fundamental groups of collations for basic database support are:

  1. UNICODE COLLATION ALGORITHM (UCA) which is not code point order.
  2. UTF8 Byte order, which is also not code point order. 

Additional fundamental collations include:

  1. Case-insensitive, and/or accent-insensitive, and/or LOCALE specification, applied to UCA
  2. Case-insensitive UTF8 Byte order

Background: Alphabetical Order

In mathematics and linguistics, alphabets are sets of letters. In common usage, an alphabet implies an ordered list due to alphabetical order. The Proto-Sinaitic script (c. 19th–16th century BCE), evolved into the Phoenician alphabet, which is the ancestor of all alphabetical languages. It is not clear to what extent alphabetical ordering of library scrolls were ever used, prior to the Greeks. But the alphabet and its order was known and widely practiced by scribes. 

Byte Order and Roman Numerals

Automated collation often relies on numerical codes of symbols, such as ASCII or Unicode, ordering characters by increasing numerical codes and extending this to strings lexicographically. For instance, a program might sort characters $, C, a, b, d as $ (36), C (67), a (97), b (98), d (100), deviating from standard alphabetical order with capitals before lowercase.

In the case of ascii characters, byte order, code point order, and UCA order are all agreed. However this might not always be the case:

While none of the collations are by code point, it is interesting to look at Byte Order Collation vs. Lexicographic Order of Code Points

Byte Order Collation:

Characters are compared byte by byte according to their UTF-8 encoding.

The order is determined by the binary values of the bytes.

Lexicographic Order of Code Points:

Characters are compared by their Unicode code points.

The order is determined by the numerical values of the code points.


Example String Comparison:


Consider the strings “Aßa” and “Aaß”:

“A” has the code point U+0041 and is encoded as 0x41 in UTF-8.

“ß” (sharp S) has the code point U+00DF and is encoded as 0xC3 0x9F in UTF-8.

“a” has the code point U+0061 and is encoded as 0x61 in UTF-8.


String 1: “Aßa”

UTF-8 encoding: 0x41 0xC3 0x9F 0x61


String 2: “Aaß”

UTF-8 encoding: 0x41 0x61 0xC3 0x9F


Byte Order Collation:

1. Compare the first byte of each string:

Both start with 0x41 (“A”), so they are equal.

2. Compare the second byte:

String 1: 0xC3

String 2: 0x61

0xC3 (195 in decimal) > 0x61 (97 in decimal)


Since 0xC3 > 0x61, in byte order collation, “Aßa” > “Aaß”.


Lexicographic Order of Code Points:

1. Compare the first character of each string:

Both are “A” (U+0041), so they are equal.

2. Compare the second character:

String 1: “ß” (U+00DF)

String 2: “a” (U+0061)

U+00DF (223 in decimal) > U+0061 (97 in decimal)


Since U+00DF > U+0061, in lexicographic order of code points, “Aßa” > “Aaß”.

Unicode Collation Algorithm

The Unicode Collation Algorithm is a standard for collating strings composed of Unicode symbols. It can be tailored for specific languages by adjusting its default collation table, with several tailorings collected in the Common Locale Data Repository.

In some applications, collation strings differ from displayed identifiers. For instance, "The Shining" might be sorted as "Shining, The." These collation strings, called sort keys, will be explored further in parts 3 and 4. Common word rules such as “The” moving to the end of the movie title is out of scope of UCA or any collations used by databases, though this can be accomplished by a column separate from “displayed_movie_title”.

TreeMap in Mode Function Implementation

The choice to prototype the Mode function using a TreeMap stemmed from its ability to maintain elements in sorted order according to a comparator, essential when collation settings affect string data equality. TreeMap uses a Red-Black Tree (RB Tree), ensuring balanced operations with logarithmic time complexity for insertions, deletions, and searches.

Technical Discussion: TreeMap and Mode Function

TreeMap was considered for several reasons:

  • Support for Ordered Comparisons: TreeMap's comparator determines equality, supporting efficient counts by keys equal under the comparator but not by default physical equality used by hash maps.

Code Example

Here's how TreeMap was utilized in the proposed Mode function:

scala
Ordering.comparatorToOrdering(CollationFactory.fetchCollation(collationId).comparator)) { case (map, (key: String, count)) => map(org.apache.spark.unsafe.types.UTF8String.fromString(key)) = map.getOrElse(org.apache.spark.unsafe.types.UTF8String.fromString(key), 0L) + count }

After which the map was sorted by count and then lexicographically, and the head of the list was chosen. 

Reflections on Using TreeMap

Benchmarks revealed high overhead of constructing a new data structure, or at least this one. 

Some of the practical challenges that concerned me about this approach still concern me: handling high cardinality datasets and the absence of parallelized bulk operations. 

Example Use-Case

Consider the following SQL query, which demonstrates the practical implications of collation support:

sql
SELECT mode(col) FROM VALUES ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), ('b') AS tab(col);

With UTF8_BINARY collation, the query should return 'a', reflecting a simple binary comparison. However, with UTF8_BINARY_LCASE collation, the result might be 'B' or 'b', depending on how the system interprets lowercase and uppercase letters in sorting and comparison.

Data Structures Already Used By Mode

The TypedImperativeAggregate interface extends ImperativeAggregate functionality by allowing the aggregation buffer to be a user-defined Java object. This flexibility enables custom aggregation functions tailored to specific data types.

TypedImperativeAggregate can optimize performance based on data characteristics, significantly reducing JVM boxing and unboxing overhead. We will discuss the memory efficiency of using OpenHashMap.

The way these structures are used support distributed computing environments, with efficient serialization and deserialization, suitable for horizontal scaling. The parallel model entails three functions: eval and merge. Put simply and archaically, Eval is to Map what Merge is to Reduce. 

The TypedAggregateWithHashMapAsBuffer implementation of TypedImperativeAggregate maintains a map from keys to longs, representing counts in current use-cases. Ideally, a generic TypedAggregateWithHashMapAsBuffer with a generic HashMap would be used, but in this case, generics are defined as AnyRef mapping to a Long. The OpenHashMap data structure is chosen for its efficiency with mutable keys and values, allowing rapid access and update operations essential for aggregation tasks.

Operations and Relevance

Space Complexity:

  • Space: O(n)

Time Complexity:

  • Search: O(log n) (amortized and worst-case)
  • Insert: O(log n) (amortized and worst-case)
  • Delete: O(log n) (amortized and worst-case)

Deletion: Not used in this context.

Searching: Relevant, especially since each insertion involves a getOrElse operation, leading to n searches for n collation-unaware keys. For example, if the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then n = 3, resulting in n log n total searches.

Insertion: Relevant, as each insertion operation requires rebalancing the tree, involving multiple rotations and color changes. While these operations are logarithmic in time complexity, the constants involved can be significant, especially with large datasets. For this implementation, there are m rebalancing insertions, with k being the number of collation-aware keys. If the dataframe is ('a'), ('a'), ('a'), ('B'), ('B'), ('b'), (‘b’), then k = 2. Using collation keys and a hash-based data structure would make k linearly related to m, where m is the number of elements with colliding hashcodes.

Bulk Insertion: Would be relevant if supported by TreeMap. However, TreeMap lacks a dedicated bulk insertion method, which could be seen as an oversight. Bulk operations could potentially be optimized to reduce the number of rebalancings. TreeMap in Scala does not have a built-in method specifically optimized for bulk insertions. Instead, it relies on individual insertions, leading to multiple rebalancings when adding elements one by one.

Parallel algorithms for constructing red-black trees from sorted lists of items can run in constant time or O(log log n) time, depending on the computer model and the number of processors available. However, since sorting is O(n log n), we do not asymptotically benefit from a bulk insert requiring sorted order.

Throughout this process, I considered that sometimes the cardinality of elements for the mode is low, even in large datasets. For example, a 20 TB dataframe with billions of rows might have a trivial TreeMap size by the time Spark calls Mode.eval(). However, high cardinality cases illustrate that constructing a TreeMap with billions of keys can consume significant space and time, potentially overloading the master node and lacking parallelization.

Ultimately, it is not scalable to perform all this work inside the eval function on the master node, as the data structure is not serialized or deserialized, and cannot be shuffled or persisted. The next step would involve changing the Mode function to rely on a new implementation of TypedImperativeAggregate using a TreeMap or another comparison-based map, rather than a HashMap.

Next Steps and Preview

In the upcoming post, will be about modifying the OpenHashMap and OpenHashSet, the second approach we tried. 


Further Reading:

Unicode Collation: https://www.unicode.org/reports/tr10/#Common_Misperceptions


Bigtable

LSM-Tree. Scalability Discussion. . Bigtable: A Distributed Storage System for Structured Data.

Why Should Code Reviews Be Fast?

In general, reviewers should favor approving a CL once it is in a state where it definitely improves the overall code health of the system being worked on, even if the CL isn’t perfect. That is the senior principle among all of the code review guidelines.

When code reviews are slow, several things happen:

  • The velocity of the team as a whole is decreased. Yes, the individual who doesn’t respond quickly to the review gets other work done. However, new features and bug fixes for the rest of the team are delayed by days, weeks, or months as each CL waits for review and re-review.
  • Developers start to protest the code review process. If a reviewer only responds every few days, but requests major changes to the CL each time, that can be frustrating and difficult for developers. Often, this is expressed as complaints about how “strict” the reviewer is being. If the reviewer requests the same substantial changes (changes which really do improve code health), but responds quickly every time the developer makes an update, the complaints tend to disappear. Most complaints about the code review process are actually resolved by making the process faster.
  • Code health can be impacted. When reviews are slow, there is increased pressure to allow developers to submit CLs that are not as good as they could be. Slow reviews also discourage code cleanups, refactorings, and further improvements to existing CLs.


Two Good Blogs

Two of my favorites


 https://www.brendangregg.com/ industry expert in performance, author of DTrace

https://spdustin.substack.com/ Author of the Auto-expert prompt-context to get better chatGPT responses




ML Success Metrics specific to Regression

Machine Learning Metrics Analysis

Understanding RMSE and RMSLE

According to the Official Google Cloud Certified Professional Machine Learning Engineer Study Guide:

The root‐mean‐squared error (RMSE) is the square root of the average squared difference between the target and predicted values. If you are worried that your model might incorrectly predict a very large value and want to penalize the model, you can use this. Ranges from 0 to infinity.

The root‐mean‐squared logarithmic error (RMSLE) metric is similar to RMSE, except that it uses the natural logarithm of the predicted and actual values +1. This is an asymmetric metric, which penalizes underprediction (value predicted is lower than actual) rather than overprediction.

Mona, Mona; Ramamurthy, Pratap. Official Google Cloud Certified Professional Machine Learning Engineer Study Guide (Sybex Study Guide) (p. 12). Wiley. Kindle Edition.

Practical Testing

Upon testing what I thought this meant, here is what I saw:

from sklearn.metrics import mean_squared_error, root_mean_squared_error, root_mean_squared_error, root_mean_squared_log_error
y_true = [60, 80, 90, 750] 
y_pred = [67, 78, 91, 102]
root_mean_squared_log_error(y_pred=y_true, y_true= y_pred)                                
0.9949158238939428
root_mean_squared_log_error(y_pred=y_pred, y_true=y_true)
0.9949158238939428
root_mean_squared_error(y_pred, y_true)
324.02083266358045
root_mean_squared_error(y_true, y_pred)
324.02083266358045

Confusion and Clarification

The results were confusing for a couple of reasons:

  1. If RMSE is for when "your model might incorrectly predict a very large value and want to penalize the model", then shouldn't it be an asymmetric metric, as the authors allege that RMSLE is?
  2. Why are these two metrics, one of which is allegedly asymmetric, and the other presumably asymmetric, obviously symmetric metrics?

The answer revolves around the nature of logarithmic transformation used in RMSLE and its practical implications, which while mathematically symmetric, react differently to over and under-predictions based on the data scale.

Parity of the number of bits == 1

Basic Parity Calculation

The function parity(n: int) -> int calculates the parity of the number n. Parity is 1 if the number of 1s in the binary representation of n is odd, and 0 otherwise. The function iterates over each bit of n, using result ^= n & 1 to flip the result if the current bit is 1, and then right-shifts n by one bit.

def parity(n: int) -> int:

        result = 0
while n:
result ^= n & 1
n >>= 1
return result

Complexity: The time complexity is O(m), where m is the number of bits in the number. This is because the algorithm checks each bit exactly once.

Kernigan's Algorithm for Parity Calculation

The function kernigan_parity(n: int) -> int also calculates the parity of the number n. It optimizes the basic approach by directly skipping to the next set bit using the operation n &= n - 1, which drops the lowest set bit of n. The result is XORed with 1 for each set bit found.

def kernigan_parity(n: int) -> int:

    result = 0
while n:
result ^= 1
n &= n - 1 # drop the lowest set bit
return result

Complexity: The time complexity is O(k), where k is the number of bits set to 1 in the number. This improvement over the basic method occurs because it processes only the bits that are 1, skipping all zeros.

I benchmarked it as follows:

First, I had to generate example inputs for the different n,k combinations. I did that like so:

```


@staticmethod
def get_inputs(k: int, m: int) -> list:
# k is number of bits that are 1
# m is the bitlength
random_numbers = ParityBenchmarking.gen_random_numbers(k, m, 25)
return random_numbers

@staticmethod
def gen_random_number(k, m):
r = Random()
randbits = r.getrandbits(m)
while bin(randbits).count('1') != k:
random_num_between_0_and_m = r.randint(0, m-1)
if bin(randbits).count('1') < k:
randbits |= 1 << random_num_between_0_and_m
else:
randbits &= ~(1 << random_num_between_0_and_m)
return randbits

@staticmethod
def gen_random_numbers(k, m, numNumbers) -> list:
return [ParityBenchmarking.gen_random_number(k, m) for _ in range(numNumbers)]
``` 

Benchmarking Code:

@staticmethod
def benchmark(my_func: Callable[[int], None], inputs: list = None, n: int = 5000) -> timedelta:
start_time = datetime.datetime.now()
for _ in range(n):
    for input_val in inputs:
    my_func(input_val)
return end_time - start_time


@staticmethod
def print_benchmark3(k: int, m: int, benchmarks: Dict, my_func: Callable[[int], None], impl: str) -> None:
inputs = ParityBenchmarking.get_inputs(k, m)
    benchmark_key = (k,m,impl)
    duration = Benchmarker.benchmark(my_func, inputs=inputs).microseconds
print(f"Input: {inputs} ({benchmark_key}, Time taken: {duration} microseconds")
        benchmarks[benchmark_key] = duration

Results:

The summarized output from the benchmarking data displays the walltime measurements for computing parity using the Kernighan and standard parity algorithms over various configurations of 'k' (the number of set bits) and 'm' (the total number of bits). The walltime generally increases with both higher values of 'k' and 'm'.

For the **Kernighan parity algorithm**, the results show a clear trend where the walltime increases with both the number of set bits 'k' and the total number of bits 'm'. The increase in walltime is more pronounced with an increase in 'k' compared to an increase in 'm'. For example, as 'k' increases from 1 to 32 at a fixed 'm', the walltime shows significant growth. Similarly, for a fixed 'k', increasing 'm' also results in increased walltime, but the effect is generally less dramatic than changes in 'k'.

For the **standard parity calculation** (simply labeled "parity" in the table), the results also show an increase in walltime with increases in 'k' and 'm', but the growth pattern appears less systematic than with the Kernighan method.






Correlations






|   m     | algorithm       |   correlation |
|---------|-----------------|---------------|
| 48 | parity | 0.777746 |
| 148 | parity | 0.693694 |
| 248 | parity | -0.590393 |
| 348 | parity | 0.369503 |
| 448 | parity | -0.459705 |
| 548 | parity | 0.388659 |
| 48 | kernigan_parity | 0.999983 |
| 148 | kernigan_parity | 0.999875 |
| 248 | kernigan_parity | 0.999627 |
| 348 | kernigan_parity | 0.999851 |
| 448 | kernigan_parity | 0.999986 |
| 548 | kernigan_parity | 0.999858 |



| k | algorithm | correlation |
|---------|-----------------|---------------|
| 1 | parity | 0.337478 |
| 2 | parity | 0.0487518 |
| 4 | parity | -0.387477 |
| 8 | parity | 0.317987 |
| 16 | parity | 0.348491 |
| 32 | parity | 0.0311046 |
| 1 | kernigan_parity | 0.936622 |
| 2 | kernigan_parity | 0.981157 |
| 4 | kernigan_parity | 0.395702 |
| 8 | kernigan_parity | 0.986231 |
| 16 | kernigan_parity | 0.994522 |
| 32 | kernigan_parity | 0.603758 |

  - The relationship between 'm' and walltime is also significant but slightly weaker than 'k', still showing a strong positive correlation.

- For **standard parity**:

  - The correlation between 'k' and walltime is also positive, but much weaker compared to Kernighan's parity, suggesting less predictability.

  - The correlation between 'm' and walltime shows variability, with some coefficients indicating weak to moderate relationships.

In summary, the Kernighan algorithm’s performance is more consistently influenced by the number of set bits and the total number of bits, showing predictably higher times with increases in these parameters. The standard parity algorithm exhibits increased times with larger 'k' and 'm' but with more variability and generally less efficiency compared to Kernighan’s method.

Statistical Analysis

From the regression and correlation coefficients, it's clear that the correlation between 'k' and walltime is very strong for Kernighan's parity, suggesting that 'k' is a reliable predictor of performance for this algorithm. The relationship between 'm' and walltime is significant but slightly weaker. For the standard parity calculation, the correlation is positive but much weaker compared to Kernighan's parity, indicating less predictability.


Profiling

I used python memory-profiler (`@profile`). Unfortunately, I couldn't use perf due to its absence on OSX, and I didn't delve into using dtrace. 


Questions


1. How can a bit string, in practical systems, keep growing without running into overflow, considering limited word sizes in CPUs?

   - Some modern programming environments support extended or arbitrary-precision arithmetic that handles integers larger than a CPU's standard word size.


2. Does the single assembly instruction for decrementing a register/address alter the computational complexity?

   - Both basic and Kernighan's algorithms operate at the bit level. The difference in computational complexity (O(m) vs. O(k)) does manifest in practical performance benefits, as fewer operations are needed in Kernighan's method when fewer bits are set.


Further exploration could include leveraging CPU-specific features to see if they substantially alter the computational complexity or the practical efficiency of these algorithms and understanding how compiler optimizations impact operational speed and efficiency in different computing environments.



Quote

 

“No amount of experimentation can ever prove me right; a single experiment can prove me wrong.”


IYKYK

https://gist.github.com/GideonPotok/9d8de616ee20571d1d38ea760c5b99a2