This tutorial extends Getting started with Databricks. You can run these in the Databricks notebook.
Step 1: Login to Databricks notebook:
https://community.cloud.databricks.com/login.html.
Step 2: Create a CLUSTER and it will take a few minutes to come up. This cluster will go down after 2 hours.
Step 3: Given the user ratings for various marketing campaigns, how will you go about divide the ratings into 5 buckets & then select the top bucket.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | from pyspark.sql.types import * campaign_list = ( (1, 101, "Online Campaign", 4), (1, 102, "Celebrity Campaign", 5), (1, 103, "TV Campaign", 3), (2, 102, "Celebrity Campaign", 4), (2, 104, "Radio Campaign", 5), (3, 101, "Online Campaign", 2), (4, 101, "Online Campaign", 4), (5, 102, "Celebrity Campaign", 5), (6, 103, "TV Campaign", 3), (7, 102, "Celebrity Campaign", 3), (8, 105, "Movies Campaign", 3), (9, 106, "Adwords Campaign", 2), (10, 104, "Radio Campaign", 5), (11, 101, "Online Campaign", 2), (12, 101, "Online Campaign", 4), (13, 102, "Celebrity Campaign", 5), (14, 103, "TV Campaign", 3), (15, 102, "Celebrity Campaign", 3), (16, 105, "Movies Campaign", 1), (17, 107, "Word by mouth", 4), (18, 102, "Celebrity Campaign", 3), (19, 105, "Movies Campaign", 3), (20, 106, "Adwords Campaign", 4), (21, 105, "Movies Campaign", 1), (22, 107, "Word by mouth", 4), (23, 102, "Celebrity Campaign", 3), (24, 108, "News Paper", 3), (25, 108, "News Paper", 4) ) data_schema = [StructField('user_id', IntegerType(), False), StructField('campaign_id', IntegerType(), False), \ StructField('campaign_name', StringType(), False), StructField('user_rating', IntegerType(), False)] final_struc = StructType(fields=data_schema) df = spark.sparkContext.parallelize(campaign_list); campaign_df = spark.createDataFrame(df, final_struc) campaign_df.show() |
Outputs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | +-------+-----------+------------------+-----------+ |user_id|campaign_id| campaign_name|user_rating| +-------+-----------+------------------+-----------+ | 1| 101| Online Campaign| 4| | 1| 102|Celebrity Campaign| 5| | 1| 103| TV Campaign| 3| | 2| 102|Celebrity Campaign| 4| | 2| 104| Radio Campaign| 5| | 3| 101| Online Campaign| 2| | 4| 101| Online Campaign| 4| | 5| 102|Celebrity Campaign| 5| | 6| 103| TV Campaign| 3| | 7| 102|Celebrity Campaign| 3| | 8| 105| Movies Campaign| 3| | 9| 106| Adwords Campaign| 2| | 10| 104| Radio Campaign| 5| | 11| 101| Online Campaign| 2| | 12| 101| Online Campaign| 4| | 13| 102|Celebrity Campaign| 5| | 14| 103| TV Campaign| 3| | 15| 102|Celebrity Campaign| 3| | 16| 105| Movies Campaign| 1| | 17| 107| Word by mouth| 4| +-------+-----------+------------------+-----------+ only showing top 20 rows |
Subquery approach
One way to get the desired output is by using subqueries as shown below. Subqueries can be clumsy to read as they require you to work inside out when trying to understand logic.
Step 4: NTILE is an analytic function that divides an ordered data set into a number of buckets indicated and assigns the appropriate bucket number to each row.
3 queries are nested inside out below:
1) Compute the average rating by campaign_id & campaign_name.
2) Divide the output into 5 buckets by average rating in descending order.
3) Filter the top bucket.
1 2 3 4 5 6 7 8 9 | campaign_df.createOrReplaceTempView("campaign_rating") #create a temp view df_average_campaign_rating = spark.sql("select * from \ (select *, ntile(5) over (order by avg_rating desc) as rating_rank \ from (SELECT campaign_id, campaign_name, avg(user_rating) as avg_rating FROM campaign_rating GROUP BY 1,2) dt) \ WHERE rating_rank = 1") df_average_campaign_rating.show() |
Outputs:
1 2 3 4 5 6 7 | +-----------+--------------+----------+-----------+ |campaign_id| campaign_name|avg_rating|rating_rank| +-----------+--------------+----------+-----------+ | 104|Radio Campaign| 5.0| 1| | 107| Word by mouth| 4.0| 1| +-----------+--------------+----------+-----------+ |
Common Table Expression (i.e. CTE) approach
The CTEs solve 2 key problems.
1) “logic on top of logic“ problem where you want to do a data manipulation on top of the result of another data manipulation as demonstrated with subqueries above.
2) Make your code easier to read without the clumsy nested queries as shown above.
Step 5: Directly as SQL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | %sql WITH avg_rating AS ( SELECT campaign_id, campaign_name, avg(user_rating) as avg_rating FROM campaign_rating GROUP BY 1,2 ), rating_ranked AS ( select *, ntile(5) over (order by avg_rating desc) as rating_rank from avg_rating ) select * from rating_ranked where rating_rank = 1; |
Outputs:
1 2 3 4 5 6 7 8 | +-----------+--------------+----------+-----------+ |campaign_id| campaign_name|avg_rating|rating_rank| +-----------+--------------+----------+-----------+ | 104|Radio Campaign| 5| 1| | 107| Word by mouth| 4| 1| +-----------+--------------+----------+-----------+ |
Step 6: Spark SQL with Python notebook.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | %python query = ''' WITH avg_rating AS ( SELECT campaign_id, campaign_name, avg(user_rating) as avg_rating FROM campaign_rating GROUP BY 1,2 ), rating_ranked AS ( select *, ntile(5) over (order by avg_rating desc) as rating_rank from avg_rating ) select * from rating_ranked where rating_rank = 1; ''' df_average_campaign_rating = spark.sql(query); df_average_campaign_rating.show(); |
Outputs:
1 2 3 4 5 6 7 8 | +-----------+--------------+----------+-----------+ |campaign_id| campaign_name|avg_rating|rating_rank| +-----------+--------------+----------+-----------+ | 104|Radio Campaign| 5.0| 1| | 107| Word by mouth| 4.0| 1| +-----------+--------------+----------+-----------+ |