pyspark环境搭建

配置hadoop

spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到hadoop的winutils和hadoop.dll,首先我们必须配置好hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils

gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils

我选择了使用这个仓库提供的最高的hadoop版本3.0.0将其解压到d:\deploy\hadoop-3.0.0目录下,然后配置环境变量:

我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:

不过这步需要拥有管理员权限才可以操作。

为了能够在任何地方使用winutils命令工具,将%hadoop_home%\bin目录加入环境变量中:

安装pyspark与java

首先,我们安装spark当前(2022-2-17)的最新版本:

需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出java.lang.illegalargumentexception: unsupported class file major version 55的错误。

这是因为pyspark内置了scala,而scala是基于jvm的编程语言,scala与jdk的版本存在兼容性问题,jdk与scala的版本兼容性表:

jdk version minimum scala versions recommended scala versions
17 2.13.6, 2.12.15 (forthcoming) 2.13.6, 2.12.15 (forthcoming)
16 2.13.5, 2.12.14 2.13.6, 2.12.14
13, 14, 15 2.13.2, 2.12.11 2.13.6, 2.12.14
12 2.13.1, 2.12.9 2.13.6, 2.12.14
11 2.13.0, 2.12.4, 2.11.12 2.13.6, 2.12.14, 2.11.12
8 2.13.0, 2.12.0, 2.11.0, 2.10.2 2.13.6, 2.12.14, 2.11.12, 2.10.7
6, 7 2.11.0, 2.10.0 2.11.12, 2.10.7

当前3.2.1版本的pyspark内置的scala版本为2.12.15,意味着jdk17与其以下的所有版本都支持。

这里我依然选择安装jdk8的版本:

测试一下:

jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):

绿化版java11的环境配置与python调用java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安装

pip安装当前最新的graphframes:

然后在官网下载graphframes的jar包。

下载地址:https://spark-packages.org/package/graphframes/graphframes

由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:

然后将该jar包放入pyspark安装目录的jars目录下:

pyspark安装位置可以通过pip查看:

使用方法

学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html

在下面的网页,官方提供了在线jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

启动spark并读取数据

本地模式启动spark:

sparksession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。

点击environment选项卡可以查看当前环境中的变量:

启动hive支持

找到pyspark的安装位置,例如我的电脑在d:\miniconda3\lib\site-packages\pyspark

手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了mysql作为原数据库,则还需要将mysql对应的驱动jar包放入spark的jars目录下。

创建spark会话对象时可通过enablehivesupport()开启hive支持:

spark访问hive自己创建的表有可能会出现如下的权限报错:

caused by: java.lang.runtimeexception: the root scratch dir: /tmp/hive on hdfs s
hould be writable. current permissions are: rwx——

是因为当前用户不具备对\tmp\hive的操作权限:

把\tmp\hive目录的权限改为777即可顺利访问:

spark的dataframe与rdd

从spark2.x开始将rdd和dataframe的api统一抽象成dataset,dataframe就是dataset[row],rdd则是dataset.rdd。可以将dataframe理解为包含结构化信息的rdd。

将含row的rdd转换为dataframe只需要调用todf方法或sparksession的createdataframe方法即可,也可以传入schema覆盖类型或名称设置。

dataframe的基础api

dataframe默认支持dsl风格语法,例如:

将dataframe注册成表或视图之后即可进行纯sql操作:

pyspark可以直接很方便的注册udf并直接使用:

执行结果:

[row(length=’4′)]
[row(length=’3′)]

rdd的简介

dataframe的本质是对rdd的包装,可以理解为dataframe=rdd[row]+schema。

rdd(a resilient distributed dataset)叫做弹性可伸缩分布式数据集,是spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。

在每一个rdd内部具有五大属性:

  • 具有一系列的分区
  • 一个计算函数操作于每一个切片
  • 具有一个对其他rdd的依赖列表
  • 对于 key-value rdds具有一个partitioner分区器
  • 存储每一个切片最佳计算位置

一组分片(partition),即数据集的基本组成单位。对于rdd来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建rdd时指定rdd的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的cpu core的数目。

**一个计算每个分区的函数。**spark中rdd的计算是以分片为单位的,每个rdd都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

**rdd之间的依赖关系。**rdd的每次转换都会生成一个新的rdd,所以rdd之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对rdd的所有分区进行重新计算。

**一个partitioner,即rdd的分片函数。**当前spark中实现了两种类型的分片函数,一个是基于哈希的hashpartitioner,另外一个是基于范围的rangepartitioner。只有对于于key-value的rdd,才会有partitioner,非key-value的rdd的parititioner的值是none。partitioner函数不但决定了rdd本身的分片数量,也决定了parent rdd shuffle输出时的分片数量。

**一个列表,存储存取每个partition的优先位置(preferred location)。**对于一个hdfs文件来说,这个列表保存的就是每个partition所在的块的位置。按照“移动数据不如移动计算”的理念,spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

rdd的api概览

rdd包含transformation apiaction api,transformation api都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行action api时这些转换才会真正运行。

transformation api产生的两类rdd最重要,分别是mappartitionsrddshuffledrdd

产生mappartitionsrdd的算子有map、keyby、keys、values、flatmap、mapvalues 、flatmapvalues、mappartitions、mappartitionswithindex、glom、filter和filterbyrange 。其中用的最多的是mapflatmap,但任何产生mappartitionsrdd的算子都可以直接使用mappartitionsmappartitionswithindex实现。

产生shuffledrdd的算子有combinebykeywithclasstag、combinebykey、aggregatebykey、foldbykey 、reducebykey 、distinct、groupbykey、groupby、partitionby、sortbykey 和 repartitionandsortwithinpartitions。

combinebykey到groupbykey 底层均是调用combinebykeywithclasstag方法:

三个重要参数的含义:

  • createcombiner:根据每个分区的第一个元素操作产生一个初始值
  • mergevalue:对每个分区内部的元素进行迭代合并
  • mergecombiners:对所有分区的合并结果进行合并

groupbykey的partitioner未指定时会传入默认的defaultpartitioner。例如:

aggregatebykey:每个分区使用zerovalue作为初始值,迭代每一个元素用seqop进行合并,对所有分区的结果用combop进行合并。例如:

reducebykey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:

action api有:

动作 含义
reduce(func) 通过func函数聚集rdd中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回rdd的元素个数
first() 返回rdd的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takesample(withreplacement*,*num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeordered(n, [ordering]) 排序并取前n个元素
saveastextfile(path) 将数据集的元素以textfile的形式保存到hdfs文件系统或者其他支持的文件系统,对于每个元素,spark将会调用tostring方法,将它装换为文件中的文本
saveassequencefile(path) 将数据集中的元素以hadoop sequencefile的格式保存到指定的目录下,可以使hdfs或者其他hadoop支持的文件系统。
saveasobjectfile(path) 将rdd中的元素用nullwritable作为key,实际元素作为value保存为sequencefile格式
countbykey() 针对(k,v)类型的rdd,返回一个(k,int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

spark模拟实现mapreduce版wordcount:

各类rdd

  • shuffledrdd :表示需要走shuffle过程的网络传输
  • coalescedrdd :用于将一台机器的多个分区合并成一个分区
  • cartesianrdd :对两个rdd的所有元素产生笛卡尔积
  • mappartitionsrdd :用于对每个分区的数据进行特定的处理
  • cogroupedrdd :用于将2~4个rdd,按照key进行连接聚合
  • subtractedrdd :用于对2个rdd求差集
  • unionrddpartitionerawareunionrdd :用于对2个rdd求并集
  • zippedpartitionsrdd2:zip拉链操作产生的rdd
  • zippedwithindexrdd:给每一个元素标记一个自增编号
  • partitionwisesampledrdd:用于对rdd的元素按照指定的百分比进行随机采样

当我们需要给datafream添加自增列时,可以使用zipwithuniqueid方法:

api用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.rdd.html#pyspark.rdd

cache&checkpoint

rdd通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该rdd将会被缓存在计算节点的内存中,并供后面重用。

checkpoint的源码注释可以看到:

  • 标记该rdd作为检查点。
  • 它将被保存在通过sparkcontext#setcheckpointdir方法设置的检查点目录中
  • 它所引用的所有父rdd引用将全部被移除
  • 这个方法在这个rdd上必须在所有job执行前运行。
  • 强烈建议将这个rdd缓存在内存中,否则这个保存文件的计算任务将重新计算。

从中我们得知,在执行checkpoint方法时,最好同时,将该rdd缓存起来,否则,checkpoint也会产生一个计算任务。

graphframes 的用法

graphframe是将spark中的graph算法统一到dataframe接口的graph操作接口,为scala、java和python提供了统一的图处理api。

graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes

可以参考:

  • 官网:https://graphframes.github.io/graphframes/docs/_site/index.html
  • graphframes用户指南-python — databricks文档:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

在graphframes中图的顶点(vertex)和边(edge)都是以dataframe形式存储的:

  • 顶点dataframe:必须包含列名为“id”的列,用于作为顶点的唯一标识
  • 边dataframe:必须包含列名为“src”和“dst”的列,根据唯一标识id标识关系

创建图的示例:

graphframe提供三种视图:

获取顶点的度、入度和出度:

motif finding (模式发现)

示例:

假设我们要想给用户推荐关注的人,可以找出这样的关系:a关注b,b关注c,但是a未关注c。找出这样的关系就可以把c推荐给a:

结果:

+—+—+
|  a|  c|
+—+—+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+—+—+

motif在查找路径过程的过程中,还可以沿着路径携带状态。例如我们想要找出关系链有4个顶点,而且其中3条边全部都是”friend”关系:

结果:

+—————+————–+—————+————–+—————+————–+—————+
|              a|            ab|              b|            bc|              c|            cd|              d|
+—————+————–+—————+————–+—————+————–+—————+
| {a, alice, 34}|{a, e, friend}|{e, esther, 32}|{e, d, friend}| {d, david, 29}|{d, a, friend}| {a, alice, 34}|
|{e, esther, 32}|{e, d, friend}| {d, david, 29}|{d, a, friend}| {a, alice, 34}|{a, b, friend}|   {b, bob, 36}|
| {d, david, 29}|{d, a, friend}| {a, alice, 34}|{a, e, friend}|{e, esther, 32}|{e, d, friend}| {d, david, 29}|
|{e, esther, 32}|{e, d, friend}| {d, david, 29}|{d, a, friend}| {a, alice, 34}|{a, e, friend}|{e, esther, 32}|
+—————+————–+—————+————–+—————+————–+—————+

subgraphs 子图

可以直接过滤其顶点或边,dropisolatedvertices()方法用于删除孤立没有连接的点:

还可以基于模式发现获取到的边创建subgraphs :

graphframes支持的graphx算法

  • pagerank:查找图中的重要顶点。
  • 广度优先搜索(bfs):查找从一组顶点到另一组顶点的最短路径
  • 连通组件(connectedcomponents):为具备连接关系的顶点分配相同的组件id
  • 强连通组件(stronglyconnectedconponents):根据每个顶点的强连通分量分配scc。
  • 最短路径(shortest paths):查找从每个顶点到目标顶点集的最短路径。
  • 三角形计数(trianglecount):计算每个顶点所属的三角形的数量,经常用于确定组的稳定性(相互连接的数量代表了稳定性)或作为其他网络度量(如聚类系数)的一部分,在社交网络分析中用来检测社区。
  • 标签传播算法(lpa):检测图中的社区。

pagerank算法:

结果:

+—+——-+—+——————-+
| id|   name|age|           pagerank|
+—+——-+—+——————-+
|  b|    bob| 36| 2.7025217677349773|
|  c|charlie| 30| 2.6667877057849627|
|  a|  alice| 34| 0.4485115093698443|
|  e| esther| 32| 0.3613490987992571|
|  f|  fanny| 36|0.32504910549694244|
|  d|  david| 29|0.32504910549694244|
|  g|  gabby| 60|0.17073170731707318|
+—+——-+—+——————-+

可以设置起始顶点:

广度优先搜索bfs:

搜索从姓名叫esther到年龄小于32的最小路径:

可以指定只能在指定的边搜索:

connected components 连通组件:

必须先设置检查点:

结果:

+—+——-+—+————+
| id|   name|age|   component|
+—+——-+—+————+
|  a|  alice| 34|412316860416|
|  b|    bob| 36|412316860416|
|  c|charlie| 30|412316860416|
|  d|  david| 29|412316860416|
|  e| esther| 32|412316860416|
|  f|  fanny| 36|412316860416|
|  g|  gabby| 60|146028888064|
+—+——-+—+————+

可以看到仅g点在一个连通区域内,可以调用dropisolatedvertices()方法,删除这种孤立的没有连接的点:

结果:

+—+——-+—+————+
| id|   name|age|   component|
+—+——-+—+————+
|  a|  alice| 34|412316860416|
|  b|    bob| 36|412316860416|
|  c|charlie| 30|412316860416|
|  d|  david| 29|412316860416|
|  e| esther| 32|412316860416|
|  f|  fanny| 36|412316860416|
+—+——-+—+————+

strongly connected components 强连通组件:

shortest paths 最短路径:

每个顶点到a或d的最短路径:

triangle count 三角形计数:

说明顶点a/e/d构成三角形。

标签传播算法(lpa):

pyspark3.x与pandas融合

pyspark从3.0版本开始出现了pandas_udf装饰器、applyinpandas和mapinpandas,基于这些方法,我们就可以使用熟悉的pandas的语法处理spark对象的数据。

首先创建几条测试数据,并启动 apache arrow

自定义udf和udaf

pyspark暂不支持自定义udtf。

使用pandas_udf装饰器我们可以创建出基于pandas的udf自定义函数,在dsl的语法中可以被直接使用:

注册函数和视图后,可以直接在sql中使用:

结果均为:

+——-+
|product|
+——-+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+——-+

还支持聚合函数和窗口函数:

注册到udf之后同样可以直接使用sql实现:

结果均为:

+——–+
| mean_v |
+——–+
|     4.2|
+——–+

+—+——–+
| id| mean_v |
+—+——–+
|  1|     1.5|
|  2|     6.0|
+—+——–+

+—+—-+——+
| id|   v|mean_v|
+—+—-+——+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+—+—-+——+

分组聚合与join

applyinpandas需要在datafream调用groupby之后才能使用:

结果:

+—+—-+—-+—-+
| id|   v|  v1|  v2|
+—+—-+—-+—-+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+—+—-+—-+—-+

subtract_mean函数接收的是对应id的dataframe数据,schema指定了返回值的名称和类型列表。

通过以下代码我们可以知道,applyinpandas可以借助cogroup进行表连接:

示例:

map迭代

执行以下代码:

后台看到执行结果为:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前台结果几乎保持原样。可以知道iterator是一个分区迭代器,迭代出当前分区的每一行数据都被封装成一个pandas对象。

pyspark与pandas的交互

将spark的datafream对象转换为原生的pandas对象只需调用topandas()方法即可:

将原生的pandas对象转换为spark对象可以使用spark的顶级方法:

习惯使用pandas的童鞋,还可以直接使用pandas-on-spark,在spark3.2.0版本时已经匹配到pandas 1.3版本的api。通过pandas-on-spark,我们可以完全用pandas的api操作数据,而底层执行却是spark的并行化。

使用pandas-on-spark最好设置一下环境变量:

将spark对象转换为pandas-on-spark对象:

pandas-on-spark对象也可以还原成spark对象:

另外spark提供直接将文件读取成pandas-on-spark对象的api,例如:

ps对象与原生pandas对象的api几乎完全一致。

ps对象相对于原生pandas对象的api几乎一致,同时还支持一些强悍的功能,例如直接以sql形式访问:

{pdf}访问了变量名为pdf的pandas-on-spark对象。

到此这篇关于pyspark与graphframes的安装与使用的文章就介绍到这了,更多相关pyspark与graphframes使用内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!