Spark interview Q&As with coding examples in Scala – part 2

This extends Spark interview Q&As with coding examples in Scala – part 1 with the key optimisation concepts.

Partition Pruning

Q13. What do you understand by the concept Partition Pruning?
A13. Spark & Hive table partitioning by year, month, country, department, etc will optimise reads by storing files in a hierarchy of directories based on the partitioning keys, hence reducing the amount of I/O needed to process your query/data. This will prevent full scanning of data by reading data only from a list of partitions, based on a filter on the partitioning key, skipping the rest.

For example, you can use active_flag, year & month as keys to store large volume data.

Spark partition pruning

If the files are stored as above, and when you do the below operation only “/my/base/folder/active_flag=Y/” and its year & month subfolders & parquet files will be scanned. The path to “active_flag=N” will be ignored. This prevents full scan.

Only “/my/base/folder/active_flag=Y/year=2020/” and its month subfolders & parquet files will be scanned.

Only “/my/base/folder/active_flag=Y/year=2020/month=01/” and parquet files will be scanned.

Q14. How will you write a dataframe data partitioned?
A14. Using the “partitionBy” keyword.

Passing multiple columns:

Predicate Push Down

A “predicate” in maths & functional programming is a function that returns a boolean value. A predicate in SQL is a WHERE clause used to filter data. In general, a JOIN is performed before filtering the data in a WHERE clause. Predicate Push Down is an optimization technique used to apply filtering before a join to avoid loading unnecessary data into memory.

Q15. What is Predicate/Filter Push Down?
A15. When you execute where or filter operators right after reading a dataset, and if partition filters are present the Spark catalyst optimizer pushes down the partition filters as shown below. This means the scan reads only the sub directories that match the partition filters as in active_flag/2020/01/, hence reducing the disk I/O.

Predicate or Filter Push Down

Dynamic Partitioning

Q16. In reality, you will have a large fact table like orders & a small dimension table like products. Can you apply static partition pruning in this scenario?

A16. In the above example, the fact table ORDERS will be a huge table with millions of rows & PRODUCTS is a small dimension table with 100K records. The static partition pruning is not beneficial on the small PRODUCTS table. The table that is more appealing and more attractive to pruning is the huge ORDERS table, which is partitioned by order_date & product_id.

You can first join the small dimension table with the huge fact table to an intermediate table with a static partitioning on “product_category”. The obvious downside is that the join operation to create an intermediate table with all rows from both tables will be an expensive one. You are also duplicating the whole data with an intermediate table.

Spark 3.0 has introduced Dynamic Partition Pruning to optimise this type of scenarios by taking the filtered results from the dimension table, and then using them directly to limit the data from the fact table.

Conditions for Dynamic Partition Pruning are:

1) The tables that need to be pruned (i.e. often larger fact table) must be partitioned by any one of the join key columns. In the above example, the ORDERS table is partitioned by “product_id”.

2) It works only with equi-joins(i.e. “=”). You cannot use it for p.product_category != “WHITE_GOODS”

The steps involved are:

1) Scan the dimension table PRODUCTS & apply filter (i.e. p.product_category != “WHITE_GOODS”). If the dimension table PRODUCTS is partitioned, then filter applied is pushed down before the scan process in dimension scan.

2) Spark creates an inner subquery from the dimension table PRODUCTS, which is broadcasted and hashed across all the executors. This subquery is meant for pruning unwanted partitions from the fact table ORDERS in the scanning phase.

3) Join only the selected partitions from the fact table ORDERS with the filtered dimension table PRODUCTS.

Constant Folding

Q17. What is Constant Folding?
A17. Constant Folding is a operator optimization rule that replaces expressions that can be statically evaluated with their equivalent literal values.

Output:

It is better to compute expression lit(3) * 2 once, and then repeat 6 for each row. This is what the “constant folding” does.

Output:

In the optimized logical plan & physical plan it says “Project [6 AS (3 * 2)#155]”. The Spark execution has the below phases:

source: https://www.learntospark.com/2020/02/spark-sql-catalyst-optimizer.html

Q18. What is a UDF? Is it a good practice to use them?
A18. UDFs stands for “User Defined Functions”.

Output:

Favor Spark SQL functions over UDFs because Spark treats UDFs as blackbox, which result in losing many optimisations like: Predicate pushdown, Constant folding and many others.

The above can be achieved via built-in function “org.apache.spark.sql.functions.upper

Avoiding UDFs is not always possible as not all functionality exists in Apache Spark functions.

Column Projection

Q19. What is Column Projection in Spark?
A19. A “Column Projection (i.e. selection)” is to read only the required columns and skip the rest. For example, if you have a table with 100 columns, and your query requires only ten, then specify only those 3 columns in your select.

Column oriented data formats like Parquet naturally stores data in a columnar fashion to save on disk I/O.


Categories Menu - Q&As, FAQs & Tutorials

Top