This tutorial extends Setting up Spark and Scala with Maven.
Step 1: Let’s take a simple example of joining a student to department. This will be written in an SQL world as:
1 2 3 4 5 |
SELECT s.name, d.name FROM Student s JOIN Department d on s.deptId = d.id |
Step 2: Let’s create classes to represent Student and Department data.
1 2 3 4 5 |
case class Student(id: String, name: String, deptId: String) { } |
1 2 3 4 5 |
case class Department(id: String, name: String) { } |
Step 3: The Spark job with a RDD join.
Q. Why join with a department?
A. To get the department name.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD object JoinsInSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Join Data") .setMaster("local")//local mode val sc = new SparkContext(conf) //could be read from a csv file in HDFS val students = List(new Student("ST-01", "John", "DPT-01"), new Student("ST-02", "Peter", "DPT-01"), new Student("ST-03", "Sam", "DPT-02") ) //could be read from a csv file in HDFS val departments = List(new Department("DPT-01", "Science"), new Department("DPT-02", "Maths") ) //create RDDs val studentRDD = sc.parallelize(students) val departmentRDD = sc.parallelize(departments) //key/value pairs. Keyed on deptId. val studentsByDeptId: RDD[(String, Student)] = studentRDD.keyBy { x => x.deptId} val departmentByDeptId: RDD[(String, Department)] = departmentRDD.keyBy { x => x.id} //lets join by department id val studentsJoinedToDepartment = studentsByDeptId.join(departmentByDeptId); //x is a tuple studentsJoinedToDepartment.foreach { x => val studentDetail: Student = x._2._1 val departmentDetail: Department = x._2._2 println(studentDetail.name + " is enrolled in " + departmentDetail.name); } } } |
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/10/29 12:48:51 INFO SparkContext: Running Spark version 1.3.0 16/10/29 12:48:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/10/29 12:48:56 INFO SecurityManager: Changing view acls to: java-_000 16/10/29 12:48:56 INFO SecurityManager: Changing modify acls to: java-_000 16/10/29 12:48:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(java-_000); users with modify permissions: Set(java-_000) 16/10/29 12:48:56 INFO Slf4jLogger: Slf4jLogger started 16/10/29 12:48:56 INFO Remoting: Starting remoting 16/10/29 12:48:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@pastel:54628] 16/10/29 12:48:56 INFO Utils: Successfully started service 'sparkDriver' on port 54628. 16/10/29 12:48:56 INFO SparkEnv: Registering MapOutputTracker 16/10/29 12:48:56 INFO SparkEnv: Registering BlockManagerMaster 16/10/29 12:48:56 INFO DiskBlockManager: Created local directory at C:\Users\JAVA-_~1\AppData\Local\Temp\spark-6bd2d43e-6c6a-4828-833e-6908940879fe\blockmgr-9b4f52a1-5a20-4b3a-aabd-52476da90961 16/10/29 12:48:56 INFO MemoryStore: MemoryStore started with capacity 1952.6 MB 16/10/29 12:48:56 INFO HttpFileServer: HTTP File server directory is C:\Users\JAVA-_~1\AppData\Local\Temp\spark-b993a4aa-21a1-4ac8-8875-ad55cc6078fb\httpd-4555f4d2-832b-4eee-92c3-fe8d838f0421 16/10/29 12:48:56 INFO HttpServer: Starting HTTP Server 16/10/29 12:48:56 INFO Server: jetty-8.y.z-SNAPSHOT 16/10/29 12:48:56 INFO AbstractConnector: Started SocketConnector@0.0.0.0:54629 16/10/29 12:48:56 INFO Utils: Successfully started service 'HTTP file server' on port 54629. 16/10/29 12:48:56 INFO SparkEnv: Registering OutputCommitCoordinator 16/10/29 12:48:57 INFO Server: jetty-8.y.z-SNAPSHOT 16/10/29 12:48:57 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 16/10/29 12:48:57 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/10/29 12:48:57 INFO SparkUI: Started SparkUI at http://pastel:4040 16/10/29 12:48:57 INFO Executor: Starting executor ID <driver> on host localhost 16/10/29 12:48:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@pastel:54628/user/HeartbeatReceiver 16/10/29 12:48:57 INFO NettyBlockTransferService: Server created on 54666 16/10/29 12:48:57 INFO BlockManagerMaster: Trying to register BlockManager 16/10/29 12:48:57 INFO BlockManagerMasterActor: Registering block manager localhost:54666 with 1952.6 MB RAM, BlockManagerId(<driver>, localhost, 54666) 16/10/29 12:48:57 INFO BlockManagerMaster: Registered BlockManager 16/10/29 12:48:57 INFO SparkContext: Starting job: foreach at JoinsInSpark.scala:39 16/10/29 12:48:57 INFO DAGScheduler: Registering RDD 2 (keyBy at JoinsInSpark.scala:32) 16/10/29 12:48:57 INFO DAGScheduler: Registering RDD 3 (keyBy at JoinsInSpark.scala:33) 16/10/29 12:48:57 INFO DAGScheduler: Got job 0 (foreach at JoinsInSpark.scala:39) with 1 output partitions (allowLocal=false) 16/10/29 12:48:57 INFO DAGScheduler: Final stage: Stage 2(foreach at JoinsInSpark.scala:39) 16/10/29 12:48:57 INFO DAGScheduler: Parents of final stage: List(Stage 0, Stage 1) 16/10/29 12:48:57 INFO DAGScheduler: Missing parents: List(Stage 0, Stage 1) 16/10/29 12:48:57 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at keyBy at JoinsInSpark.scala:32), which has no missing parents 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(2176) called with curMem=0, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 1952.6 MB) 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(1589) called with curMem=2176, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1589.0 B, free 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54666 (size: 1589.0 B, free: 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 16/10/29 12:48:57 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839 16/10/29 12:48:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at keyBy at JoinsInSpark.scala:32) 16/10/29 12:48:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/10/29 12:48:57 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[3] at keyBy at JoinsInSpark.scala:33), which has no missing parents 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(2160) called with curMem=3765, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.1 KB, free 1952.6 MB) 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(1577) called with curMem=5925, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1577.0 B, free 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54666 (size: 1577.0 B, free: 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 16/10/29 12:48:57 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 16/10/29 12:48:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[3] at keyBy at JoinsInSpark.scala:33) 16/10/29 12:48:57 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 16/10/29 12:48:57 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1412 bytes) 16/10/29 12:48:57 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/10/29 12:48:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 881 bytes result sent to driver 16/10/29 12:48:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1366 bytes) 16/10/29 12:48:57 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 16/10/29 12:48:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 96 ms on localhost (1/1) 16/10/29 12:48:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/10/29 12:48:57 INFO DAGScheduler: Stage 0 (keyBy at JoinsInSpark.scala:32) finished in 0.112 s 16/10/29 12:48:57 INFO DAGScheduler: looking for newly runnable stages 16/10/29 12:48:57 INFO DAGScheduler: running: Set(Stage 1) 16/10/29 12:48:57 INFO DAGScheduler: waiting: Set(Stage 2) 16/10/29 12:48:57 INFO DAGScheduler: failed: Set() 16/10/29 12:48:57 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 1) 16/10/29 12:48:57 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 881 bytes result sent to driver 16/10/29 12:48:57 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 18 ms on localhost (1/1) 16/10/29 12:48:57 INFO DAGScheduler: Stage 1 (keyBy at JoinsInSpark.scala:33) finished in 0.093 s 16/10/29 12:48:57 INFO DAGScheduler: looking for newly runnable stages 16/10/29 12:48:57 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/10/29 12:48:57 INFO DAGScheduler: running: Set() 16/10/29 12:48:57 INFO DAGScheduler: waiting: Set(Stage 2) 16/10/29 12:48:57 INFO DAGScheduler: failed: Set() 16/10/29 12:48:57 INFO DAGScheduler: Missing parents for Stage 2: List() 16/10/29 12:48:57 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[6] at join at JoinsInSpark.scala:36), which is now runnable 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(2552) called with curMem=7502, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 1952.6 MB) 16/10/29 12:48:57 INFO MemoryStore: ensureFreeSpace(1771) called with curMem=10054, maxMem=2047491440 16/10/29 12:48:57 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1771.0 B, free 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54666 (size: 1771.0 B, free: 1952.6 MB) 16/10/29 12:48:57 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 16/10/29 12:48:57 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 16/10/29 12:48:57 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[6] at join at JoinsInSpark.scala:36) 16/10/29 12:48:57 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 16/10/29 12:48:57 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1902 bytes) 16/10/29 12:48:57 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 16/10/29 12:48:57 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/10/29 12:48:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 16/10/29 12:48:57 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/10/29 12:48:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms John is enrolled in Science Peter is enrolled in Science Sam is enrolled in Maths 16/10/29 12:48:57 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 886 bytes result sent to driver 16/10/29 12:48:57 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 53 ms on localhost (1/1) 16/10/29 12:48:57 INFO DAGScheduler: Stage 2 (foreach at JoinsInSpark.scala:39) finished in 0.053 s 16/10/29 12:48:57 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 16/10/29 12:48:57 INFO DAGScheduler: Job 0 finished: foreach at JoinsInSpark.scala:39, took 0.412082 s |