beat365网址官网网站-365体育世界杯专用版-365体育注册送365

Spark RDD及其常用算子介绍

Spark RDD及其常用算子介绍

一、RDD介绍

1.1 什么是RDD

RDD(Resilient Distributed DataSet),称作弹性分布式数据集,是Spark中最基本的数据抽象,表示一个不可变的,分区的,其中元素可以被并行计算的数据集合。所有的数据操作都是建立在RDD这一抽象数据结构上的,就好比我们Java中的List,Set一样,只不过List和Set是在一个JVM进程中的,不是分布式的而已。

1.2 RDD的特点

RDD是分区的;

分区是RDD数据存储的最小单元,一个RDD中的数据会被分为多个分区存储在分布式集群上。

# 指定使用三个分区

>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 3).glom().collect()

[[1,2,3],[4,5,6],[7,8,9]]

# 指定使用六个分区

>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 6).glom().collect()

[[1],[2,3],[4],[5,6],[7],[8,9]]

RDD的运算会作用在其所有的分区上;

我们对RDD中的数据进行运算时,其效果是同时作用在所有的分区上的。

# 对6个分区中的所有数据进行乘10的操作

>>> sc.parallelize([1,2,3,4,5,6,7,8,9],6).map(lambda x: x*10).glom().collect()

[[10], [20, 30], [40], [50, 60], [70], [80, 90]]

RDD之间是有依赖关系的;

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test1").setMaster("local")

sc=SparkContext(conf=conf)

rdd1 = sc.parallelize([1,2,3,4,5,6], 3)

rdd2 = rdd1.map(lambda x : x * 10)

rdd3 = rdd2.map(lambda x : x + 1)

print(rdd3.collect())

其中rdd3依赖rdd2,rdd2又依赖rdd1,它们之间形成了一个依赖链条;

对于K-V形式数据的RDD可以使用自定义分区器;

在K-V类型的数据shuffle过程中,默认的分区规则是Hash分区,我们可以自定义指定使用何种分区规则;

RDD的分区规划会尽量靠近其数据所在的服务器;

Spark会在确保并行计算能力的前提下,尽量让各个分区从本地读取数据,而非通过网络读取数据;

1.3 RDD的创建

要想创建RDD对象,首先需要初始化Spark RDD的程序入口,即SparkContext对象,只有基于它,才能创建出第一个RDD对象。

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test1").setMaster("local")

sc=SparkContext(conf=conf)

# SparkContext的主要作用就是创建出第一个RDD对象

rdd1 = sc.parallelize([1,2,3,4,5,6], 3)

通常我们有两种方式来创建RDD:

通过并行化本地集合创建;

# 将本地集合转为分布式的集合

rdd1 = sc.parallelize([1,2,3,4,5,6], 3)

通过读取外部数据文件创建;

# 读取指定文件(支持HDFS等其它存储)转为分布式集合,分区数会由spark根据设定值自行调整

rdd1 = sc.textFile("./word.txt", 100)

二、常用算子介绍

分布式集合对象上的API操作被称为算子,算子通常被分为三大类:

2.1 转换算子Transformation

转换算子的结果仍然是一个RDD对象,这类算子是懒加载的,只有遇到Action算子,它们才会真正进行转换计算;

map,将RDD中的数据一条条进行处理,rdd.map(func)

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test1").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize([1,2,3,4,5])

# 如果方法体中的逻辑比较简单,推荐使用lambda匿名方法的形式

# [10, 20, 30, 40, 50]

print(rdd.map(lambda x : x * 10).collect())

# 如果方法体中的逻辑比较复杂,推荐使用定义方法的形式

def map_func(data):

return (data * 10 + 1)

# [11, 21, 31, 41, 51]

print(rdd.map(map_func).collect())

flatmap,先对RDD进行map操作,然后再解除数据的嵌套,rdd.flatmap(func)

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test2").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize(["a b c", "d e f", "a c f"])

# [['a', 'b', 'c'], ['d', 'e', 'f'], ['a', 'c', 'f']]

print(rdd.map(lambda x : x.split(" ")).collect())

# ['a', 'b', 'c', 'd', 'e', 'f', 'a', 'c', 'f']

print(rdd.flatMap(lambda x : x.split(" ")).collect())

reduceByKey,表示对于K-V类型的数据,先按照key进行分组,然后将各个组的值按照指定的函数进行迭代计算;

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test3").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize([("a",1),("b",1),("a",2),("b",3),("a",2)])

result = rdd.reduceByKey(lambda x,y:x+y)

# [('a', 5), ('b', 4)]

print(result.collect())

需要注意的是,reduceByKey是先在每个分区里面按照key进行分组,然后再进行shuffle,好处就是减少了shuffle的数量,性能上会提高。

groupBy,表示将RDD中的数据按照指定函数的规则进行分组;

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test3").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize([1,2,3,4,5])

result = rdd.groupBy(lambda x:x%2).collect()

# [(1, [1, 3, 5]), (0, [2, 4])]

print([(x,sorted(y)) for (x,y) in result])

Filter,表示将RDD中符合指定过滤条件的数据保留;

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test3").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize([1,2,3,4,5])

result = rdd.filter(lambda x:x%2==1).collect()

# [1, 3, 5]

print(result)

distinct,对RDD中的数据去重;

from pyspark import SparkConf

from pyspark import SparkContext

if __name__ == '__main__':

conf=SparkConf().setAppName("test3").setMaster("local")

sc=SparkContext(conf=conf)

rdd = sc.parallelize([1,1,3,3,5,5,6,1,2,3,3])

result = rdd.distinct()

# [1, 3, 5, 6, 2]

print(result.collect())

union,表示将两个RDD中的数据合并成1个RDD返回;

rdd1 = sc.parallelize([1,2,3,4,5])

rdd2 = sc.parallelize(["a","b","c"])

result = rdd1.union(rdd2)

# [1, 2, 3, 4, 5, 'a', 'b', 'c']

print(result.collect())

join,表示对于K-V类型的数据,可以使用连接操作来返回一个RDD;

# 部门ID和员工姓名

rdd1 = sc.parallelize([(1001,"zhangsan"),(1002,"lisi"),(1003,"wangwu")])

# 部门ID和部门名称

rdd2 = sc.parallelize([(1001,"sales"),(1002,"it"),(1003,"tech")])

result = rdd1.join(rdd2)

# [(1002, ('lisi', 'it')), (1001, ('zhangsan', 'sales')), (1003, ('wangwu', 'tech'))]

print(result.collect())

intersection,表示求两个RDD的交集并返回一个新的RDD;

rdd1 = sc.parallelize([(1001,"zhangsan"),(1002,"lisi"),(1003,"wangwu")])

rdd2 = sc.parallelize([(1001,"zhangwu"),(1002,"lisi"),(1003,"wangliu")])

result = rdd1.intersection(rdd2)

# [(1002, 'lisi')]

print(result.collect())

glom,表示按照分区给RDD中的数据加上嵌套;

rdd = sc.parallelize([1,2,3,4,5])

result = rdd.glom()

# [[1, 2, 3, 4, 5]]

print(result.collect())

rdd2 = sc.parallelize([1,2,3,4,5],3)

result2 = rdd2.glom()

# [[1], [2, 3], [4, 5]]

print(result2.collect())

groupByKey,功能上类似于reduceByKey,但是原理上不同,是将所有分区上的数据都先进行shuffle,性能上不如reduceByKey,适合小数据集时使用;

rdd = sc.parallelize([("a",1),("b",1),("a",2),("b",3),("a",2)])

result = rdd.groupByKey().map(lambda x: (x[0],list(x[1])))

# [('a', 5), ('b', 4)]

# [('a', [1, 2, 2]), ('b', [1, 3])]

print(result.collect())

sortBy,根据指定的规则对RDD中的数据进行排序;

data = [("a",3),("d",1),("e",2),("b",5)]

result1 = sc.parallelize(data).sortBy(lambda x:x[0])

# [('a', 3), ('b', 5), ('d', 1), ('e', 2)]

print(result1.collect())

result2 = sc.parallelize(data).sortBy(lambda x:x[1])

# [('d', 1), ('e', 2), ('a', 3), ('b', 5)]

print(result2.collect())

sortByKey,表示对于K-V类型的数据,按照数据的key进行排序;

data = [("a",3),("b",5),("g",3),("b",1),("k",2),("c",5),("g",2),("f",5),("z",2),("t",5),("d",1),("e",2),("n",2),("l",5)]

# 按照key升序排列,且要求全局有序

result1 = sc.parallelize(data,3).sortByKey(ascending=True, numPartitions=1)

# [('a', 3), ('b', 5), ('b', 1), ('c', 5), ('d', 1), ('e', 2), ('f', 5), ('g', 3), ('g', 2), ('k', 2), ('l', 5), ('n', 2), ('t', 5), ('z', 2)]

print(result1.collect())

# 按照key降序排列,不保证全局有序,只保证各个分区内数据有序

result2 = sc.parallelize(data,3)

# 查看当前分区情况

# [[('a', 3), ('b', 5), ('g', 3), ('b', 1)], [('k', 2), ('c', 5), ('g', 2), ('f', 5)], [('z', 2), ('t', 5), ('d', 1), ('e', 2), ('n', 2), ('l', 5)]]

print(result2.glom().collect())

# [('a', 3), ('b', 5), ('b', 1), ('c', 5), ('d', 1), ('e', 2), ('f', 5), ('g', 3), ('g', 2), ('k', 2), ('l', 5), ('n', 2), ('t', 5), ('z', 2)]

print(result2.sortByKey(ascending=True, numPartitions=3).collect())

2.2 动作算子Action

动作算子的结果不是RDD对象,它相当于是一个指令来让整个执行计划开始真正工作;通常在Action算子之前都会有一系列的转换算子链条,最终通过一个Action算子来结束当前的计算链条,同时其也是启动整个计算链条工作的开关;

countByKey,对于K-V类型的数据,统计Key出现的次数;

rdd = sc.parallelize([("a",1),("b",1),("a",2),("b",3),("a",2)])

result = rdd.countByKey();

# defaultdict(, {'a': 3, 'b': 2})

print(result)

collect,将RDD各个分区内的数据统一收集到Driver中,形成一个List;

这个在上面的例子中都有介绍,就不额外举例了。

reduce,表示对RDD内部的元素使用指定的函数进行迭代计算,rdd.reduce(func)

rdd = sc.parallelize([2,1,3,4])

result = rdd.reduce(lambda x,y:x*y)

# 2*1*3*4=24

print(result)

fold,功能和reduce类似,但是可以接受带初始值进行聚合;

# 在三个分区平均放置1~9

rdd = sc.parallelize(range(1,10),3)

# [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

print(rdd.glom().collect())

result = rdd.fold(10, lambda x,y:x+y)

# 10 + (10+1+2+3) + (10+4+5+6) + (10+7+8+9) = 85

print(result)

first,取出RDD中的第一个元素;

take,取出RDD中的前n个元素,组合成List返回;

top,对RDD中的数据进行降序排列,取前N个;

count,计算RDD中有多少条数据,返回一个数字;

takeSalmpe,从RDD中随机抽取数据;

takeOrdered,对RDD中的数据进行排序,然后取前N个;

2.3 分区算子

分区上的数据在本分区内由各自的Executor分别直接执行,不用经过Driver;通常我们需要控制并发数和分区数一致时可以考虑试用分区算子。

foreach,对RDD中的每一个元素,执行提供的逻辑操作;

rdd = sc.parallelize([1,2,3,4,5,6,9],3)

# 在每次迭代中打印出计算的内容

r = rdd.foreach(lambda x: print(x * 10))

# 由于foreach没有返回值,此处打印为None

print(r)

# foreach并没有修改原始数据集合,因此打印内容仍然为[1, 2, 3, 4, 5, 6, 9]

print(rdd.collect())

saveAsTextFile,将RDD的数据写入文本或者HDFS等文件系统;注意:分区执行,不经过Driver,因此在每个分区上都会有一个结果文件;

# 要求将数据分为三个分区,实验效果才够明显

rdd = sc.parallelize([1,2,3,4,5,6,9],3)

# 将分区详情打印出来,理论上将和保存结果一致,[[1, 2], [3, 4], [5, 6, 9]]

print(rdd.glom().collect())

# 要求data文件不能已存在

rdd.saveAsTextFile("E:\data")

执行完成后,我们打开文件夹就能看到三个输出文件:

part-00000,里面内容为1,2

part-00001,里面内容为3,4

part-00002,里面内容为5,6,9

foreachPartition,功能和foreach一致,只不过是对每个分区分别进行处理;

rdd = sc.parallelize([1,2,3,4,5,6,9],3)

# [[1, 2], [3, 4], [5, 6, 9]]

print(rdd.glom().collect())

def operate(data):

print("------")

result = list()

for i in data:

result.append(i * 10)

print(result)

# ------

# [10, 20]

# ------

# [30, 40]

# ------

# [50, 60, 90]

rdd.foreachPartition(operate)

mapPartitions,功能同map,只不过是对每个分区分别进行处理;

rdd = sc.parallelize([1,2,3,4,5],3)

# [[1], [2, 3], [4, 5]]

print(rdd.glom().collect())

def add(data):

yield sum(data)

# [[1], [5], [9]]

print(rdd.mapPartitions(add).glom().collect())

partitionBy,对RDD中的数据进行自定义分区操作;

repartition,对RDD的分区进行重新分区;

coalesce,对分区的数量进行增减;

mapValues,针对KV形式的数据,对其内部的value进行map操作;

这里只是列举了常用的一些算子,详细的内容请参考官方文档:API Reference — PySpark 3.3.2 documentation (apache.org)

三、案例演示与分析

3.1 wordcount

我们在项目根目录下新建一个待统计的文本文件,当然你可以指定其它任何位置的文件皆可:

hello spark

hello python

hello pyspark

welcome to spark world

现在我们需要编写一个基于pyspark的计数程序:

# 读取文件转换为RDD

fileRdd = sc.textFile("./word.txt")

# 对每行数据按章空格进行切割,并去除每行自带的嵌套

wordRdd = fileRdd.flatMap(lambda x : x.split(" "))

# ['hello', 'spark', 'hello', 'python', 'hello', 'pyspark', 'welcome', 'to', 'spark', 'world']

print(wordRdd.collect())

# 每个单词计数为1,形成KV形式

wordPairRdd = wordRdd.map(lambda x : (x, 1))

# [('hello', 1), ('spark', 1), ('hello', 1), ('python', 1), ('hello', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('spark', 1), ('world', 1)]

print(wordPairRdd.collect())

# 对单词进行规约reduce计算,相同的单词计数器相加

wordCountRdd = wordPairRdd.reduceByKey(lambda a, b : a + b)

# [('hello', 3), ('spark', 2), ('python', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('world', 1)]

print(wordCountRdd.collect())

# 将RDD对象转换为List进行输出

result = wordCountRdd.collect()

# [('hello', 3), ('spark', 2), ('python', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('world', 1)]

print(result)

3.2 商品筛选

现在我们要基于如下项目根目录下的商品信息进行商品数据的提取:要求从中提取出北京的商品,去重展示。

{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}

{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}

{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}

{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}

我们编写的pyspark的程序如下:

from pyspark import SparkConf

from pyspark import SparkContext

import json

if __name__ == '__main__':

conf=SparkConf().setAppName("test1").setMaster("local")

sc=SparkContext(conf=conf)

# 读取文件转换为RDD

fileRdd = sc.textFile("./goods.txt")

# 每行数据按照分隔符进行分割,得到单独的json数据

jsonRdd = fileRdd.flatMap(lambda x : x.split("|"))

# print(jsonRdd.collect())

# 使用json库转为字典类型

dictRdd = jsonRdd.map(lambda x : json.loads(x))

# {'id': 1, 'timestamp': '2019-05-08T01:03.00Z', 'category': '平板电脑', 'areaName': '北京', 'money': '1450'}等等8条

print(dictRdd.collect())

# 筛选北京的字典KV数据

beijingDictRdd = dictRdd.filter(lambda x : x["areaName"] == "北京")

# {'id': 20, 'timestamp': '2019-05-08T01:01.00Z', 'category': '电脑', 'areaName': '北京', 'money': '2450'}等等5条

print(beijingDictRdd.collect())

# 将北京的地域名和商品名拼接起来

resultRdd = beijingDictRdd.map(lambda x : x["areaName"] + "_" + x["category"])

# ['北京_电脑', '北京_食品', '北京_服饰']

print(resultRdd.distinct().collect())

四、踩坑小记

4.1 Too many parameters

在使用某些算子进行实现的时候,比如reduceByKey、distinct的时候,明明代码案例和官网示例一模一样,但就是执行报错:

TypeError: Too many parameters for typing.Iterable; actual 2, expected 1

后来在stackoverflow上查到一个类似的,说是python版本和spark版本不兼容导致的,我出现问题的版本是:

python3.6

spark3.3.2

然后将python更新到3.7版本后,该报错问题就解决了:

conda install python=3.7