The poor rabbit chased by Python and Anaconda :p

0%

Pyspark Databricks Exercise: RDD Part 1

Pyspark Databricks Exercise: RDD

  • the purpose of this practice is to get a deeper understanding of the properties of RDD.
  • we will not talk about what is rdd and what that means. There are plenty of materials online with excellent explainations.
1
2
3
4
5

# First, as usual, create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
1
2
3
4
5
6
7
8
9
'''* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
'''
1
2
3
4
5
# 1 CREATE RDD
# 1.1 From a list/tuple/set/dict(key mapped only)
# 1.2 From a local file
# 1.3 From another RDD:
# 1.4 Create pair RDD from RDD
1
2
3
4
5
# 1.1 question: create a rdd from namelist
namelist = ['Ana','Bob']
# 1.1 solution
nameRDD = sc.parallelize(namelist)
nameRDD.collect()
Out[48]: ['Ana', 'Bob']
1
2
3
4
5
6
7
# 1.2 question: create a RDD from filepath
# use dbutils to list files under directorys. /databricks-datasets/ contains system provided datasets for practice
dbutils.fs.ls("/databricks-datasets/README.md")
filepath = "/databricks-datasets/README.md"
# 1.2 solution
fileRDD = sc.textFile(filepath)
fileRDD.take(5)
Out[49]: ['Databricks Hosted Datasets', '==========================', '', 'The data contained within this directory is hosted for users to build ', 'data pipelines using Apache Spark and Databricks.']
1
2
3
4
# 1.3 question: create a rdd from nameRDD, which output: ['Ana2nd', 'Bob2nd']
# solution
anotherRDD = nameRDD.map(lambda x: x + '2nd')
anotherRDD.collect()
Out[50]: ['Ana2nd', 'Bob2nd']
1
2
3
4
#1.4 question: create a pair RDD from nameRDD using map, output should be [('Ana', 20), ('Bob', 20)]
# solution
pairRDD = nameRDD.map(lambda x: (x,20))
pairRDD.collect()
Out[51]: [('Ana', 20), ('Bob', 20)]
1
2
3
4
5
6
7
8
9
10
# 2. transformations
# 2.1 map V.S. flatmap
# 2.1 question: given a nameRDD : [['Ana','Bob'],['Caren']], use map or flatMap to return:
# ans1: ['Ana', 'Bob', 'plus', 'Caren', 'plus']
# ans2: [['Ana', 'Bob', 'plus'], ['Caren', 'plus']]
namelist = (['Ana','Bob'],['Caren'])
nameRDD = sc.parallelize(namelist)
# 2.1 solution
nameRDD.flatMap(lambda x: x + ['plus']).collect()
nameRDD.map(lambda x: x + ['plus']).collect()
Out[56]: [['Ana', 'Bob', 'plus'], ['Caren', 'plus']]
1
2
3
4
5
6
7
8
9
10
11
# 2.2 mapValues V.S. flatMapValues
# 2.2 question: given a pairRDD: [('Ana', ['A']), ('Bob', ['B']), ('Ana', ['A2'])], use mapValues or flatMapValues to get answers
# ans1: [('Ana', ['A', 'plus']), ('Bob', ['B', 'plus']), ('Ana', ['A2', 'plus'])]
# ans2: [('Ana', 'A'),('Ana', 'plus'),('Bob', 'B'),('Bob', 'plus'),('Ana', 'A2'),('Ana', 'plus')]
pairlist = [('Ana',['A']),('Bob',['B']),('Ana',['A2'])]
pairRDD = sc.parallelize(pairlist)
pairRDD.collect()

# 2.2 solutions:
pairRDD.mapValues(lambda x: x + ['plus']).collect()
pairRDD.flatMapValues(lambda x: x + ['plus']).collect()
Out[66]: [('Ana', 'A'), ('Ana', 'plus'), ('Bob', 'B'), ('Bob', 'plus'), ('Ana', 'A2'), ('Ana', 'plus')]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2.3 Aggregate
# Given a list of numbers, partition it into 3 partitions,
# 1. use map and reduce to calculate the average
# 2. use aggregate to calculate it's average
numRDD = sc.parallelize([1,2,3,4,5],3)
# solution
# 1. use map and reduce
acc, cnt = numRDD.map(lambda x: (x,1)).reduce(lambda x,y: (x[0]+y[0],x[1] +y[1]))
acc/cnt
# 2. use aggregate
zerovalues = (0,0)
seqOp = lambda acc, t: (acc[0] + t, acc[1] + 1)
combOp = lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1])
acc,cnt = numRDD.aggregate(zerovalues,seqOp,combOp)
acc/cnt
Out[13]: 3.0
1
2
3
4
5
6
7
8
9
# 2.4 set operstions
# 2.4 question: given two name RDDs, return
# union,intersection and cartesian of the two rdds
nameRDD1 = sc.parallelize(['Ana','Bob'])
nameRDD2 = sc.parallelize(['Bob','Caren','Deric'])
# 2.4 solution:
nameRDD1.union(nameRDD2).collect()
nameRDD1.intersection(nameRDD2).collect()
nameRDD1.cartesian(nameRDD2).collect()
Out[20]: [('Ana', 'Bob'), ('Ana', 'Caren'), ('Ana', 'Deric'), ('Bob', 'Bob'), ('Bob', 'Caren'), ('Bob', 'Deric')]
1
2
3
4
5
6
7
8
9
10
# 2.5 pair RDD xxxbyKey
pairRDD = sc.parallelize([('Ana',25),('Bob',30),('Deric',40),('Ana',40)])
# 2.5 question: calculate sum of Ana, Bob and Deric using groupbykey/reducebykey/foldbykey/combinebykey
# output: [('Deric', 40), ('Ana', 65), ('Bob', 30)]
# 2.5 solution:
pairRDD.groupByKey().map(lambda x: (x[0],sum(x[1]))).collect()
pairRDD.reduceByKey(lambda x,y: x + y).collect()
pairRDD.foldByKey(0,lambda x,y: x + y).collect()
pairRDD.combineByKey(lambda x: x, lambda x,y: x+y, lambda x,y: x+y).collect()
#
Out[33]: [('Deric', 40), ('Ana', 65), ('Bob', 30)]
1
2
3
4
5
6
# 2.6 question: calcualte average
# output: [('Deric', 40.0), ('Ana', 32.5), ('Bob', 30.0)]
# solution:

sumrdd = pairRDD.combineByKey(lambda x: (x,1), lambda x,y: (x[0]+y, x[1] + 1), lambda x,y: (x[0]+y[0],x[1]+y[1]))
sumrdd.map(lambda x: (x[0],x[1][0]/x[1][1])).collect()
Out[32]: [('Deric', 40.0), ('Ana', 32.5), ('Bob', 30.0)]
1
2
3
# 3. output
# 3.1 output to disk
pairRDD.saveAsTextFile('/yytest/pairrdd.txt')
1
2