Posted onInpython Symbols count in article: 7.5kReading time ≈7 mins.
Pyspark Databricks Exercise: RDD
the purpose of this practice is to get a deeper understanding of the properties of RDD.
we will not talk about what is rdd and what that means. There are plenty of materials online with excellent explainations.
1 2 3 4 5
# First, as usual, create spark session from pyspark.sqlimport SparkSession spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext
1 2 3 4 5 6 7 8 9
'''* Internally, each RDD is characterized by five main properties: * * - A listof partitions * - A function for computing each split * - A listof dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. tosaythatthe RDD is hash-partitioned) * - Optionally, a listof preferred locations to compute each split on (e.g. block locations for * an HDFS file) '''
1 2 3 4 5
# 1 CREATE RDD # 1.1 From a list/tuple/set/dict(key mapped only) # 1.2 From a local file # 1.3 From another RDD: # 1.4 Create pair RDD from RDD
1 2 3 4 5
# 1.1 question: create a rdd from namelist namelist = ['Ana','Bob'] # 1.1 solution nameRDD = sc.parallelize(namelist) nameRDD.collect()
Out[48]: ['Ana', 'Bob']
1 2 3 4 5 6 7
# 1.2 question: create a RDD from filepath # use dbutils to list files under directorys. /databricks-datasets/ contains system provided datasets for practice dbutils.fs.ls("/databricks-datasets/README.md") filepath = "/databricks-datasets/README.md" # 1.2 solution fileRDD = sc.textFile(filepath) fileRDD.take(5)
Out[49]: ['Databricks Hosted Datasets',
'==========================',
'',
'The data contained within this directory is hosted for users to build ',
'data pipelines using Apache Spark and Databricks.']
1 2 3 4
# 1.3 question: create a rdd from nameRDD, which output: ['Ana2nd', 'Bob2nd'] # solution anotherRDD = nameRDD.map(lambda x: x + '2nd') anotherRDD.collect()
Out[50]: ['Ana2nd', 'Bob2nd']
1 2 3 4
#1.4 question: create a pair RDD from nameRDD using map, output should be [('Ana', 20), ('Bob', 20)] # solution pairRDD = nameRDD.map(lambda x: (x,20)) pairRDD.collect()
Out[51]: [('Ana', 20), ('Bob', 20)]
1 2 3 4 5 6 7 8 9 10
# 2. transformations # 2.1 map V.S. flatmap # 2.1 question: given a nameRDD : [['Ana','Bob'],['Caren']], use map or flatMap to return: # ans1: ['Ana', 'Bob', 'plus', 'Caren', 'plus'] # ans2: [['Ana', 'Bob', 'plus'], ['Caren', 'plus']] namelist = (['Ana','Bob'],['Caren']) nameRDD = sc.parallelize(namelist) # 2.1 solution nameRDD.flatMap(lambda x: x + ['plus']).collect() nameRDD.map(lambda x: x + ['plus']).collect()
# 2.3 Aggregate # Given a list of numbers, partition it into 3 partitions, # 1. use map and reduce to calculate the average # 2. use aggregate to calculate it's average numRDD = sc.parallelize([1,2,3,4,5],3) # solution # 1. use map and reduce acc, cnt = numRDD.map(lambda x: (x,1)).reduce(lambda x,y: (x[0]+y[0],x[1] +y[1])) acc/cnt # 2. use aggregate zerovalues = (0,0) seqOp = lambda acc, t: (acc[0] + t, acc[1] + 1) combOp = lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1]) acc,cnt = numRDD.aggregate(zerovalues,seqOp,combOp) acc/cnt
Out[13]: 3.0
1 2 3 4 5 6 7 8 9
# 2.4 set operstions # 2.4 question: given two name RDDs, return # union,intersection and cartesian of the two rdds nameRDD1 = sc.parallelize(['Ana','Bob']) nameRDD2 = sc.parallelize(['Bob','Caren','Deric']) # 2.4 solution: nameRDD1.union(nameRDD2).collect() nameRDD1.intersection(nameRDD2).collect() nameRDD1.cartesian(nameRDD2).collect()