15+ Apache Spark best practices, memory mgmt & performance tuning interview FAQs – Part-2

This extends 15+ Apache Spark best practices, memory mgmt & performance tuning interview FAQs – Part-1, where best practices 1-6 were covered with examples & diagrams.

#7 Use Spark UI: Running Spark jobs without inspecting the Spark UI is a definite NO. It is a very handy debugging & performance tuning tool.

Spark job execution model

Spark job execution model

The UI allows to monitor and inspect the execution of jobs. Stages, tasks and shuffle writes and reads are concrete concepts that can be monitored from the Spark UI.

Spark UI showing stages, tasks & shuffles

A stage is a physical unit of execution. A stage is a set of parallel tasks – one task per partition. At shuffle boundaries, the DAG is partitioned into so called stages that are going to be executed in order. So, if your job has lots of stages, you are performing as many shuffle operations.

Shuffle boundaries denoting the stages

You can click on a job id to view the DAG (Direct Acyclic Graph) of the RDD objects.

DAG of a job

Here is a popular Apache Spark Job Interview question:

Q. Given that you are processing a huge text file with the following steps, how many jobs & stages will be created in the DAG? What are some of the key considerations to watch-out for?

1) Load the whole file into a Dataframe df1.

A. Number of Jobs = 2,

Job #1 for collect( )

Job #2 for count( )

as these two are terminal operations. The groupBy(..), filter(..) & map(..) are lazily evaluated only when a terminal operation is called.

Number of stages = 2

Stage #1 for load(…)

Stage #2 for groupBy(..), filter(…), & map(…).

There will be 2 stages because whenever there is a wider transformation, a new stage will be created. The groupBy is a wider transformation, hence a new stage will be created. filter(..) & map(..) are narrow transformations.

Considerations: Beware of using collect() against very large data as this brings the whole data back to the driver & you are likely to get an out of memory error. Consider coalesce(10) to smaller number of partitions and then writing to a storage like HDFS or AWS S3.

#8 When filtering large DataFrames into smaller ones frequently, repartition the filtered DataFrame: Spark doesn’t adjust the the number of partitions when a large DataFrame is filtered. For example, if your initial DataFrame had say 1 billion rows, and you created sample with:

“initialDf” with 1 billion records may have had 15000 partitions, and when that is filtered to “filteredDf” with 1000 rows then “filteredDf” will still have 15k partitions, which means most partitions will not have any data. This can impact performance. When you repartition, calculate the repartition size with the formula of round(15000/1000000) * number of CPUs * (3 or 4). 1000000 is used as the data is 1M times smaller, and the partition is set to 3 or 4 times the CPU cores in the cluster to evenly distribute the work load. If you only have 1 partition per CPU core, then you will have to wait for one longest running task to complete.

#9 Watch out for skewed data: Skewing happens when you don’t distribute the data evenly across the cluster nodes. One or two nodes will be processing most of the data. For example, when you join two tables by keys and say 60% of the keys are say either null or a particular value (e.g. most traded stock id), then 60% of all the data will be processed by a single node. This will adversely impact performance in a distributed computing environment.

Q. How do you know if you have a skew?
A. In your DAG, if one executor core is taking too long to process compared to the others then you have a data skew.

There is big difference in the below input data size between “Min to 75th percentile” & “75th percentile” to “Max”.

Spark Data Skew [source: https://datarus.files.wordpress.com/2015/05/skewed_join_ui.png]

In the below diagram you can see “Duration” being 1.3 min (i.e. 78 seconds) for one executor and the rest being 2s. This means one executor took ~40 times more duration to process.

Spark Skewed Data 2

Q. How do you fix skew?
A. By default Spark uses the Hashing as shown below to split work across the node. This is Hash Partitioning.

The above function will return the same value for a given key. This causes all the data with the same key to go to the same node. To fix this add salt to the key, which will add some randomness to the keys. For example, if a key column say “stock_code=xyz” is traded 50% of the time, you can create randomness by splitting it into xyz_1, xyz_2, etc by adding the salt of 1, 2, etc. This way the stock_code will be further distributed into other nodes to give better performance through better parallelism (i.e. distribution & execution of the data).

There are other techniques to deal with skewed data:

1) Increasing the number of partitions so that a given key can be hashed into more partitions, but this technique will not work when a relatively few keys are dominant in the data.

2) Using a BroadcastHashJoin strategy for joining smaller tables with larger tables. This involves broadcasting the smaller (~ 1 GB) to-be-joined table to each executor’s memory, then streaming the larger table and joining row-by-row. As the memory foot print of the smaller table increases the memory pressure will also increase, and the viability of this technique will decrease.

3) As the memory foot print of the smaller broadcast table increases the memory pressure (say 10+ GB) you can consider iteratively taking say 1 GB slices of your smaller table, broadcasting those, joining with the larger table, and then union the result.

So, analyse the data & usage pattern to devise a strategy to combat skewed data.

#10 Watch out for Cartesian joins: A Cartesian join is a join of every row of one table to every row of another table. For example, if table A has 100 rows with key “xyz” and is joined with table B, which has 10,000 rows for key “xyz”, a Cartesian join will result in 1,000,000 rows. This will exponentially get worse when you have more of these many-to-many keys in your data. Distributed systems scale linearly, and not exponentially.

Q. How will you fix cartesian join issues?
A. You can solve this in a number of ways:

1) nested structures:

For example, having a schema as

Creating a Hive table with nested data structure:

2) Windowing function: For example, in some scenarios you can rank and get the latest value before joining.

3) reduceByKey function: to reduce the data on each node before joining.

#11 Tune Spark: Spark jobs can be tuned with myriad of runtime config arguments like spark.driver.memory, spark.executor.memory, spark.memory.fraction, spark.akka.frameSize, and so on. For example, If your 2GB RDD is only partitioned into 10 partitions, to serialize the data from one partition to another node, you’ll need to ship a 200MB chunk of data from each partition via Akka. But, by default, Akka’s buffer is only 128 MB, and you can get around this by setting “akka.frameSize” to a larger value.

#12 Smart use of cache: If you use the same data twice, cache it. Unused RDDs that go out of scope will be automatically un-persisted, but they can also be explicitly released with unpersist(). Having said this, broadcast variables can be more efficient than caching the RDDs where applicable as lookup() in an RDD is O(n) where n is the length of data in a single partition, whereas a broadcasted hash map will have a lookup of O(1).

#13 When is more shuffles better? : There are 2 occasional exceptions to the rule of minimizing the number of shuffles.

Firstly, when your input data is read via large unsplittable files (E.g. JSON or XML), large numbers of records will be placed in each partition. This can hamper parallelism that takes advantages of the cluster’s CPU. In this case, invoking repartition with a higher number of partitions (this will trigger a shuffle) after loading the data will allow the tasks that come after it to leverage more of the cluster’s CPU.

Secondly, when aggregating over a higher number of partitions, the computation on the single-threaded driver can quickly become bottlenecked when merging all the results together. This can be alleviated by performing a “reduceByKey” or “aggregateByKey” (this will trigger shuffles) to perform a round of distributed aggregation on the executors that combine the dataset into a smaller number of partitions before sending their results to the driver for a final round of aggregation. You can refer to “treeAggregate” and “treeReduce” functions in the Spark API.

#14 Use toDebugString: to print out the lineage of the RDDs. Pay attention to the “ShuffledRDD”s and wide (e.g. groupByKey, join, etc) vs narrow (e.g. map, flatMap, filter, etc) dependencies.

Narrow dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD.

Wide dependencies: Each partition of the parent RDD may be used by multiple child partitions

Transformations with (usually) narrow dependencies: map, mapValues, flatMap, filter, mapPartitions and mapPartitionsWithIndex.

Transformations with (usually) wide dependencies, which may cause shuffling: cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, distinct, repartition, coalesce, etc.

#15 Cache (i.e. persist) to memory/disk when you perform more than 1 action on a DAG otherwise the whole DAG (e.g. map & filter) will be re-executed for the second or subsequent actions.

By default persisted to memory, i.e. cached. You can also spill to the disk when it goes out of memory:

#16 Configure the executors & the executor cores appropriately.

Q. What is the difference between executor & executor core?
A. An executor is a JVM process within a YARN container and an executor core is a thread within the JVM. Each task is processed by a thread. Each task work on a subset of the data.

Q. If you have a 10 node (i.e machine) cluster where each node has 16 cores (i.e. CPU cores) and 64 GB memory, how would you go about allocating no. of executors, executor memory, etc for good performance.
A.

No. of executors & executor cores

No. of total cores = 10 nodes/machines * 16 cores = 160 cores
1 core on each node for O/S = 160 cores – 10 cores = 150 cores
No. of cores available per node = 150 cores/10 nodes = 15 cores
No. of recommended cores to use per executor = 5 (i.e. say 1/3 to prevent HDFS contention)
No. of executors per node = 15/5 = 3

So, in each node/machine, 3 JVM processes & 5 threads per JVM = 15 threads per node. 5 executor cores means each executor can run a maximum of five tasks at the same time.

Executor memory

Total available per node = 64 GB
Total available after 1 GB for O/S = 64 GB – 1 GB = 63 GB.
Available per executor = 63 / 3 = 21 GB
Available after allocation 3 GB for YARN overhead (i.e. off heap buffer memory) = 18 GB
Per executor memory = 18 GB

Apache Spark’s RDDs are a collection of various data so big in size that cannot fit into a single node and should be partitioned across various nodes. Each partition will be executed by an executor. Spark automatically partitions & distributes the partitions across different nodes. Partition is the main unit of parallelism in Apache Spark.

In each node, “num-executors (JVMs) * executor-cores (i.e. threads)” = slots available to run tasks.

Tasks that run on the same executor will share the same JVM. This is used by the Broadcast feature as you only need one copy of the Broadcast data per Executor for all tasks to be able to access it through shared memory. Each task takes up one thread.

When you apply a transformation on a RDD, the transformation is applied to each of its partition. Spark spawns a single task for a single partition, which will run inside the executor (i.e. JVM process).

Q. What is the difference between repartition & Coalesce?
A. Repartition equally distributes the data across machines, hence requires reshuffling, hence slower than coalesce, but can improve the overall performance of the job as it can prevent data skewing by equally distributing the data across the executors.

Spark repartition

Spark repartition

Repartition can either reduce or increase the number of partitions. You can get the current partition size via

Coalesce merges partitions in the same machine, hence minimises the data shuffle & the task is faster than repartition, but the data can be skewed where one machine can have more data than the other.

For example:

4 partitions:

Node 1: 1,2,3
Node 2: 4,5
Node 3: 6,7
Node 4: 8,9,10,11,12,13,14

2 partitions:

Node 1: 1,2,3 + 8,9,10,11,12,13,14 = 10 records
Node 3: 6,7 + 4,5 = 4 records

As you can see the nodes 1 & 3 did not require its original data to move. So, it minimises the reshuffle, but data can be skewed as Node 1 has more data.

Coalesce can only reduce the number of partitions, NOT increase. You can get the partition size via

Q. Which is faster, repartition or coalesce?
A. coalesce runs faster than repartition due to less shuffling, but unequal sized partitions are generally slower to work with than equal sized partitions. Spark is built to work with equal sized partitions, hence it’s critical to repartition after running filtering queries as the number of partitions does not change after filtering.

coalesce is recommended while reducing the number of partitions. For example if you have 3 partitions and you want to reduce it to 2, coalesce will move the 3rd partition data to partitions 1 and 2. Partition 1 and 2 will remain in the same container. i.e less shuffling.

Q. Given that 16 GB Executor memory, how will it be distributed?
A. Spark Executor memory distribution is shown below:

Spark Executor Memory Distribution

Spark Executor Memory Distribution

NOTE: In spark 1.6 and above, there is no hard boundary between Execution memory and Storage memory.

1. Storage & Execution memory can borrow space from each other only if blocks are not used by the other.

2. If blocks from Execution memory is used by Storage memory and Execution needs more memory, it can forcefully evict the excess blocks occupied by Storage Memory.

3. If blocks from Storage Memory is used by Execution memory and Storage needs more memory, it cannot forcefully evict the excess blocks occupied by Execution Memory, it will end up having less memory area. It will wait until Spark releases the excess blocks stored by Execution memory and then occupies them.

User Vs Spark Memory

The User Vs Spark memory can be controlled by the below factor:

spark.memory.fraction — defaults to 0.75

Storage Vs Execution Memory

The Storage Vs Execution Memory can be controlled by the below factor:

spark.memory.storageFraction — defaults to 0.5

Q. What happens if the executor memory is less than 1.5 times of reserved memory, i.e. 450MB?
A. Spark will fail with a “please use larger heap size” error message.


Java Interview FAQs

800+ Java Interview Q&As

Top