This extends Docker Tutorial: Apache Kafka with Python 3 on Cloudera quickstart Step 1: Create the pyspark streaming code in python. driver.py
1 |
(my-app_env) [root@quickstart my-app]# vi driver.py |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark import SparkConf, SparkContext from pyspark.streaming import StreamingContext from mypackage import simple if __name__ == "__main__": conf = SparkConf().setAppName("Simple App") conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) # every 10 seconds simple.SimpleSpark().myfunc(ssc) ssc.start(); ssc.awaitTermination(); |
simple.py – receive messages from a topic
1 |
(my-app_env) [root@quickstart my-app]# vi mypackage/simple.py |
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils class SimpleSpark: def myfunc(self, ssc): # 2181 is the zookeeper port & 9092 is the kafka broker port kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'MyTestTopic':1}) lines = kafkaStream.map(lambda x : x[1]) lines.pprint() |
Build the .egg file Step 2: Create…