Spark interview Q&As with coding examples in Scala – part 05: Transformations, actions, pipelining & shuffling

This extends Spark interview Q&As with coding examples in Scala – part 4 with more coding examples on Databricks Note book.

Prerequisite: Create a free account as per Databricks getting started. Login to community.cloud.databricks.com, and click on “Clusters” to create a Spark cluster.

Given the below Data:

Databricks Employee Data

Spark Transformations Vs. Actions

Q31. What will be the output of the below code?

%scala
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row

val df1 = spark.read.parquet("/employees") 
val df2 = df1.groupBy($"department").agg(sum($"salary").alias("dept_total_salary"))

val df3 = df2.filter($"dept_total_salary" > 42000.00)

val df4 = df3.map(r => r.getDouble(1) * 1.1 )
val result:Array[Double]  =  df4.collect()     // action that triggers the above transformations

val count = df4.count()                        // action that re-triggers the above transformations

Note: “r.getAs[Double](“dept_total_salary”)” is an alternative to “r.getDouble(1)”

A31. The output will be:

df1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]
df2: org.apache.spark.sql.DataFrame = [department: string, dept_total_salary: double]
df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [department: string, dept_total_salary: double]
df4: org.apache.spark.sql.Dataset[Double] = [value: double]
result: Array[Double] = Array(66000.0, 49500.00000000001)
count: Long = 2

Q32. What do you understand by the terms “transformations” & “actions”? In the above code identify transformations & actions?
A32. Spark has two types of operations.

1) Transformations refer to the operations applied on a Dataframe to create new Dataframe. In the above code groupBy, filter & map are transformations.

2) Actions refer to operations which are also applied on Dataframes, that instruct Spark to perform computation and send the result back to driver. In the above code collect() & count() are actions.

Important: Transformations are lazy in nature, which means they get executed only when we call an action like reduce(), show(), count(), collect(), etc. If you comment below lines the transformations will NOT be executed.


//....
val result:Array[Double]  =  df4.collect()
val count = df4.count()
//...

Q33. Is count($”department”) in below query a transformation or action?


val df2 = df1.groupBy($"department").agg(count($"department").alias("dept_count"))  

A33. It is calling groupBy on Dataframe which returns RelationalGroupedDataset object, and count is invoked on grouped Dataset which returns a Dataframe, so its a transformation since it doesn’t gets the data to the driver.

Q34. Given the below operations, which ones are transformations & which ones are actions?

flatMap, mapValues, mapPartitions, sample, union, join, distinct, coalesce, getNumPartitions & reduce

A34. Except for getNumPartitions & reduce, the rest are transformation operations.

Pipelining Vs Shuffling

Q35. What do you understand by the terms narrow & wide transformations?
A35. Narrow Transformations are those where each input partition will contribute to only one output partition. For example, map, flatMap, filter, sample, union, mapPartitions, etc are narrow transformations.

[Spark Narrow Vs. Wide transformations source: https://stackoverflow.com/questions/42799322/how-spark-realize-which-rdd-operation-need-to-be-split-into-seperate-stage]

Wide transformation will have input partitions contributing to many output partitions. This is known as a shuffle where Spark will exchange partitions across the cluster. With narrow transformations, Spark will automatically perform an operation called pipelining on narrow dependencies, this means that if we specify multiple filters on DataFrames they’ll all be performed in-memory. The same cannot be said for shuffles. When you perform a shuffle, Spark may write the results to disk. For example, join, distinct, reduceByKey, groupByKey, repartition, coalesce, intersection, etc are wide transformations.

Shuffling creates new Stages. More stages means more Data shuffling, which could lead to performance issues.

Spark Pipelining Vs Shuffling. Shuffling creates new stages

Q36. How will you identify contents & partitions of each Dataframe?
A36. You can use the actions show() & rdd.getNumPartitions.


%scala
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row

val df1 = spark.read.parquet("/employees") 

println("partitions: " + df1.rdd.getNumPartitions)
df1.show()


val df2 = df1.groupBy($"department")
             .agg(sum($"salary").alias("dept_total_salary"))

println("partitions: " + df2.rdd.getNumPartitions)
df2.show()

val df3 = df2.filter($"dept_total_salary" > 42000.00)

println("partitions: " + df3.rdd.getNumPartitions)
df3.show()

val df4 = df3.map(r => r.getAs[Double]("dept_total_salary") * 1.1 )

println("partitions: " + df4.rdd.getNumPartitions)
df4.show()

Output:

partitions: 4
+-----+---+-------+-----------+
| name|age| salary| department|
+-----+---+-------+-----------+
|Peter| 25|35000.0|ENGINEERING|
| John| 34|45000.0|      SALES|
|  Rob| 26|42000.0|    FINANCE|
|  Sam| 34|25000.0|ENGINEERING|
+-----+---+-------+-----------+

partitions: 1
+-----------+-----------------+
| department|dept_total_salary|
+-----------+-----------------+
|ENGINEERING|          60000.0|
|      SALES|          45000.0|
|    FINANCE|          42000.0|
+-----------+-----------------+

partitions: 1
+-----------+-----------------+
| department|dept_total_salary|
+-----------+-----------------+
|ENGINEERING|          60000.0|
|      SALES|          45000.0|
+-----------+-----------------+

partitions: 1
+-----------------+
|            value|
+-----------------+
|          66000.0|
|49500.00000000001|
+-----------------+

Spark Wide & Narrow Transformations with partitions

Q37. What will be the output & no. of partitions for the below code? Why is BigDecimal used?


%scala
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row

val df1 = spark.read.parquet("/employees") 

println("partitions: " + df1.rdd.getNumPartitions)
df1.show()

val df2 = df1.map(r => (r.getAs[String]("department"),  (BigDecimal(r.getAs[Double]("salary")) * BigDecimal(1.1)).doubleValue()))  // creates a tuple with values as _1, _2, etc

println("partitions: " + df2.rdd.getNumPartitions)
df2.show()

val df3 = df2.groupBy($"_1")
             .agg(sum($"_2").alias("dept_total_salary"))
             .withColumnRenamed("_1", "department")

println("partitions: " + df3.rdd.getNumPartitions)
df3.show()

val df4 = df3.filter($"dept_total_salary" > 46200.00)

println("partitions: " + df4.rdd.getNumPartitions)
df4.show()

A37. The above code will have the below output:

partitions: 4
+-----+---+-------+-----------+
| name|age| salary| department|
+-----+---+-------+-----------+
|Peter| 25|35000.0|ENGINEERING|
| John| 34|45000.0|      SALES|
|  Rob| 26|42000.0|    FINANCE|
|  Sam| 34|25000.0|ENGINEERING|
+-----+---+-------+-----------+

partitions: 4
+-----------+-------+
|         _1|     _2|
+-----------+-------+
|ENGINEERING|38500.0|
|      SALES|49500.0|
|    FINANCE|46200.0|
|ENGINEERING|27500.0|
+-----------+-------+

partitions: 1
+-----------+-----------------+
| department|dept_total_salary|
+-----------+-----------------+
|ENGINEERING|          66000.0|
|      SALES|          49500.0|
|    FINANCE|          46200.0|
+-----------+-----------------+

partitions: 1
+-----------+-----------------+
| department|dept_total_salary|
+-----------+-----------------+
|ENGINEERING|          66000.0|
|      SALES|          49500.0|
+-----------+-----------------+

Spark Narrow Vs Wide Transformations

Q. Why use BigDecimal?
A. In A36. you can see the output for department SALES is 49500.00000000001 and NOT 49500.00. This is because multiplying floating point values causes this inaccuracy. Hence, you need to first convert to BigDecimal and then perform the multiplication operation as shown above.

Q. What other recommendations will you make to the above code?
A. Cacheing will prevent the data being read from “/employees” many times.


...
val df1 = spark.read.parquet("/employees") 
df1.cache()
...

Q. Where will you check for function names like “withColumnRenamed“?
A. You need to look for the API Docs. Google for “Spark 3 API“, and select “API Docs” –> “Scala”. This will take you to the classes & methods.

Apache Spark API

RDDs and Datasets are type safe means that the compiler knows the Columns and it’s data type of the Column whether it is Long, String, etc. Sparak 2.0 onwards:

Dataframe = Dataset[Row]

The “withColumnRenamed” is a method in “org.apache.spark.sql.Dataset[Row]“. If you drill down into this you will see all the methods supported by a Dataset.

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html

Actions

Dataset API : Actions

Basic Dataset functions

Spark API: Basic Dataset functions

Streaming & Typed Transformations

Spark API: Streaming & Typed transformations

Untyped Transformations

Where you will see agg(…), withColumnRenamed(..), etc.

Spark API: Untyped transformations

(Visited 17 times, 1 visits today)

800+ Java & Big Data Interview Q&As

200+ Java & Big Data Tutorials

Top