Prerequisite: Extends Databricks getting started – Spark, Shell, SQL.
What is a UDF?
User-Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.
Step 1: Create a new Notebook in Databricks, and choose Python as the language.
Step 2: The data is already uploaded and table has been created.
1 2 3 4 5 | emp_id,emp_name,emp_city,emp_salary 1, John, Sydney, 35000.00 2, Peter, Melbourne, 45000.00 3, Sam, Sydney,55000.00 |
Step 3: Let’s create a UDF that calculates the bonus of 10% on the “emp_salary”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | employees_df = spark.read.csv("/FileStore/tables/employee.csv", header="true", inferSchema="true", ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true") from pyspark.sql.functions import udf from pyspark.sql.types import DoubleType #a function def bonus(salary, bonus_rate): return salary * bonus_rate spark.udf.register("bonus", bonus, DoubleType()) employees_df = employees_df.withColumn("emp_bonus", bonus(employees_df.emp_salary, 0.10)) employees_df.show() |
Output:
1 2 3 4 5 6 7 8 | +------+--------+---------+----------+---------+ |emp_id|emp_name| emp_city|emp_salary|emp_bonus| +------+--------+---------+----------+---------+ | 1| John| Sydney| 35000.0| 3500.0| | 2| Peter|Melbourne| 45000.0| 4500.0| | 3| Sam| Sydney| 55000.0| 5500.0| +------+--------+---------+----------+---------+ |
Spark SQL
Step 4: Let’s use Spark the SQL style.
1 2 3 4 5 6 7 8 | employees_df = spark.read.csv("/FileStore/tables/employee.csv", header="true", inferSchema="true", ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true") #register dataframe as a view employees_df.createOrReplaceTempView("employee_vw") out = spark.sql("select s.* from employee_vw s") display(out) |
Output:
Spark SQL with UDF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | employees_df = spark.read.csv("/FileStore/tables/employee.csv", header="true", inferSchema="true", ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true") #register dataframe as a view employees_df.createOrReplaceTempView("employee_vw") from pyspark.sql.functions import udf from pyspark.sql.types import DoubleType def bonus(salary, bonus_rate): return salary * bonus_rate spark.udf.register("bonus", bonus, DoubleType()) out = spark.sql("select s.*, bonus(s.emp_salary, Double(0.10)) as emp_bonus from employee_vw s") display(out) |
gives you the same output.
UDF via lambda function
Step 5: Let’s convert our function to a Python lambda function.
1 2 3 4 5 6 7 8 9 10 11 12 13 | employees_df = spark.read.csv("/FileStore/tables/employee.csv", header="true", inferSchema="true", ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true") #register dataframe as a view employees_df.createOrReplaceTempView("employee_vw") from pyspark.sql.functions import udf from pyspark.sql.types import DoubleType spark.udf.register("bonus", lambda salary, bonus_rate: salary * bonus_rate, "double") out = spark.sql("select s.*, bonus(s.emp_salary, Double(0.10)) as emp_bonus from employee_vw s") display(out) |
groupBy, Aggregate & Alias
Step 6: Let’s group by “emp_city”, and calculate the average salary and then alias it with “emp_avg_salary_by_city”.
1 2 3 4 5 6 7 8 9 | employees_df = spark.read.csv("/FileStore/tables/employee.csv", header="true", inferSchema="true", ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true") from pyspark.sql.functions import avg employees_df = employees_df.groupBy("emp_city")\ .agg(avg("emp_salary").alias("emp_avg_salary_by_city")) employees_df.show() |
Output:
1 2 3 4 5 6 7 | +---------+----------------------+ | emp_city|emp_avg_salary_by_city| +---------+----------------------+ | Sydney| 45000.0| |Melbourne| 45000.0| +---------+----------------------+ |
You can rename a single column with “withColumnRenamed”
1 2 3 | employees_df = employees_df.withColumnRenamed("emp_city", "employee_city") employees_df.show() |
Output:
1 2 3 4 5 6 7 | +-------------+----------------------+ |employee_city|emp_avg_salary_by_city| +-------------+----------------------+ | Sydney| 45000.0| | Melbourne| 45000.0| +-------------+----------------------+ |