40+ Apache Spark best practices & optimisation interview FAQs – Part-1

There are so many different ways to solve the big data problems at hand in Spark, but some approaches can impact on performance, and lead to performance and memory issues. Here are some best practices to keep in mind when writing Spark jobs.

#1 Favor DataFrames, Datasets, or SparkSQL over RDDs: Spark DataFrames, Datasets or SparkSQL are optimised, hence faster than RDDs, especially when working with structured data. You can use RDDs when you want to perform low-level transformations of your unstructured data. Whilst RDDs are very powerful and has many advantages, it’s easy to build inefficient transformations. DataFrames provide a higher level of abstraction to query and manipulate the structured data. Spark figures out the most efficient way to do what you want to do by converting your logical plan to a physical plan.

SparkSQL is a Spark module for structured data processing. You can interact with SparkSQL through SQL, DataFrames API and Datasets API.

Spark execution plan

source: https://www.learntospark.com/2020/02/spark-sql-catalyst-optimizer.html

Datasets, DataFrames, and Spark SQL provide the following advantages over RDDs:

1) Compact columnar memory format & direct memory access
2) Reduced GC overhead
3) Catalyst query optimisation

Where possible, use Spark SQL functions like year(date), month(date), concat(‘Scala’, ‘Spark’), etc instead of custom UDFs (i.e. User Defined Functions) in order to benefit from the catalyst query optimiser.

#2 Decide on Scala Vs. Java Vs. Python: as Apache spark supports all these 3 languages. You need to consider the programming skill levels of your current team, performance, ease of use & ease of learning/master.

Compile-time Versus runtime error checking

Since Spark itself is written in Scala, any new features added in Spark API will first be available in Scala, and then Java. Scala/Java are statically typed languages, hence help us catch bugs early during compile time as opposed to python, which is a dynamically typed language where you need to wait till runtime.

SparkSQL Vs Dataframe Vs Dataset

SparkSQL Vs Dataframe Vs Dataset – https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Scala’s syntax can make it difficult for many developers to master from the learning curve perspective compared to Python & Java, but once you get a hang of it you can write more concise & elegant functional code in Scala.

Python has the main advantage if you’re talking about data science, as it provides the user with a lot of great tools (E.g. SparkMLib, Pandas, Scikit-learn, etc) for machine learning and natural language processing. Also, many Data scientists are skilled in Python.

#3 Avoid collect() on large RDDs: collect() on any RDDs will drag all the data back to the Spark driver from all the executors, which can cause the Spark driver to run out of memory and crash. You can use the following functions to inspect only the subset of the data – take() or takeSample(), and also countByKey(), countByValue() or collectAsMap().

For example: If you want to check a Dataframe is empty don’t count. Use take(n), head(), head(n), etc:

If you really have a requirement to take a look at the complete data, you can write out the Dataframes/Datasets to files or a NoSQL databases like HBase, DynamoDB, etc.

#4 Favor treeReduce(..) or treeAggregate(..) over reduce(..) or aggregate(): to limit the amount of data returned to the driver. Unlike a reduce() action which can return a large volume of data back to driver causing it to run out of memory, the treeReduce() does it in stages in the executors to reduce the data returned to the driver. When large amount of data is returned to the driver you could get an out of memory error. The treeReduce() internally calls reduceByKey() in stages depending on the defined depth.

#5 Avoid or minimize shuffle: Shuffling is an expensive operation since it involves disk I/O, data serialization, and network I/O. Even though moving data is expensive, sometimes it’s necessary. For example, certain operations need to consolidate data on a single node so that it can be co-located in memory. RDD operations that require shuffling are repartition, groupByKey, reduceByKey, cogroup(..) and join(..). The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled.

Apache Spark groupByKey Vs reduceByKey

Apache Spark groupByKey Vs reduceByKey

groupByKey() on large RDDs will cause all key-value pairs to shuffle across all the executors in the cluster by transferring unnecessary data over the network. Favor reduceByKey(), combineByKey() or foldByKey(). When you make use of reduceByKey(), the same key pairs are combined or reduced before the data is shuffled, which results in less data sent over the network. For example,

PySpark reduceByKey() transformation performs grouping + aggregation to merge the values of each key using an associative reduce function and then performs a wider transformation that shuffles the data across RDD partitions.

Output: [(‘a’, 6), (‘b’, 6)]

will produce the same output as below:

Output: [(‘a’, 6), (‘b’, 6)]

But the groupByKey will transfer the entire dataset across the network, whilst the reduceByKey will compute local sums for each key in each partition to reduce the amount of data and then combine those reduced local sums into larger sums by shuffling. In other words, the “reduceByKey” combines similar keys on the map side whereas the “groupByKey” does not.

You can also explicitly partitionBy using a “HashPartitioner“, “RangePartitioner” or a custom partitioner via a UDF (i.e. User Defined Function). For example:

Output:

Where 8 is the number of partitions. Let’s apply the custom partitioner.

Output:

Also, if you join a very large table with a smaller table, pre partition the large table before joining with the smaller table to minimize the amount of data getting shuffled (i.e. only the data in the smaller table will be shuffled).

#6 Avoid the flatMap-join-groupBy pattern when two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup as it avoids the overhead associated with unpacking and repacking the groups.

The map(func, iterator) function in Python applies a given “funct” to the iterator. In the above example, the map(list, y) converts the items in the iterator y to a list.

Output:

The key point here is that you want to avoid a shuffle, and you can avoid a shuffle if both RDDs are partitioned in the same way using the same partitioner which means all values for the same key are already on 1 partition in each RDD. This won’t be true if you first flatMap one of the RDDs before joining as it may not retain the partitioning.

#7 Use aggregateByKey over reduceByKey, where both are logically same, but aggregateByKey lets you return result in different type. When the input and output values are of different types reduceByKey will create lots of unnecessary objects. For example, a scenario where the transformation needs to find all the unique strings corresponding to each key:

Output:

Which results in lots of unnecessary objects created because a new Set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

Output:

#8 Partition pruning: is a performance optimization that limits the number of files and partitions that Spark reads when querying. This works on the partitioned data, where queries that match certain partition filter criteria improve performance by allowing Spark to only read a subset of the directories and files. Spark supports saving data in a partitioned layout seamlessly through the partitionBy method available during data source write operations. Learn more at Spark interview Q&As on partition pruning with coding examples in Scala.

#9 Predicate push-down: is a Spark feature that attempts to move filtering of data as close as possible to the source to avoid loading unnecessary data into memory. Spark make use of partition pruning and Parquet file format to improve the query performance by reducing the amount of data read. Predicate push down works by evaluating filter or where clause predicates in the query against static partition info to prune directories & the metadata stored in the Parquet files. Parquet can optionally store statistics like minimum and maximum value for a column chunk to skip reading chunks of data if the provided filter predicate value in the query is outside the range of values stored for a given column. When there is no static partitioning, it can make use of the dynamic partitioning.

Predicate or Filter Push Down

Predicate or Filter Push Down

Row vs Columnar data storage

The Parquet columnar data format gives the best read performance with Spark due to:

1) Optimised query plans with the implicit selection of efficient join strategies (e.g. broadcast join).
2) Minimised I/O via compression & storage of related values in close proximity to each other as depicted below.
3) Conserved resource utilisation technique by detecting & encoding same or similar data.
4) Pushed down column filtering by storing column metadata & statistics.
5) Vectorised Parquet reader that decompress and decode in batches to provide faster read performance.

Row Vs Columnar storage

Row Vs Columnar storage [Ref: https://karthiksharma1227.medium.com/understanding-parquet-and-its-optimization-opportunities-c7265a360391]

#10 Use of broadcast variables & accumulators: Spark provides two mechanisms in particular to reduce or avoid data movements via shuffling.

Broadcast variables enable you to have a read-only cache of say lookup data on each executor in the cluster rather than shipping a copy of it with tasks. Spark attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. Using broadcast variables enable you to do efficient joins between large and small RDDs or store a lookup table in memory that provides more efficient retrieval than doing an RDD lookup(). Broadcast when the data is less than 1 GB or 1 million rows to fit in the executor memory. A typical example of a small RDD would be a dimension table like customer, service, subscriber, account, etc to be joined with a large RDD like a fact table (e.g. transaction) like order, transaction, call detail records, etc.

Explicitly creating broadcast variables are only useful when tasks across multiple stages need the same data. A typical use case in which this might happen is when performing static lookups against a small dataset (e.g. an array or map) by joining it together with your bigger data set.

Accumulators enable you to efficiently update a variable in parallel during execution. Accumulators differ from read-only broadcast variables by allowing Spark programs to efficiently aggregate results such as counters, sums, or generated lists. Accumulators are not limited to basic types, and it can accumulate any Accumulable classes.

Q10: What if you have a medium sized dimension table to be joined with a large table?
A10: If the medium size RDD does not fit fully into memory, but its keys do then only the keys from the medium sized RDDs can be broadcast to filter out the elements of the larger RDD that do not have a matching key in the medium size RDD to reduce the size of the data before the shuffle. If there is a significant amount of elements that gets discarded this way, the resulting shuffle will need to transfer a lot less data.

It is important to note that the optimisation here depends on the filter operation reducing reasonable size of the larger RDD. Otherwise there is not much efficiency to be gained with this strategy.

In the below example in Pyspark, custDF is the medium dimension table & the callDetailDF is the large call detail records table.

OUTPUT:

Let’s filter out the large table based on the keys from the medium table:

OUTPUT:

#11 to #30

40+ Apache Spark best practices & optimisation interview FAQs – Part-2

#31 to #37

40+ Apache Spark best practices & optimisation interview FAQs – Part-3

The examples can be tried on PySpark on Databricks Tutorials.


Java & Big Data Interview FAQs

Java Key Areas Interview Q&As

800+ Java Interview Q&As

Java & Big Data Tutorials