0%

在Spark程序里面指定jdk版本

[TOC]

在使用spark集群时经常会出现一种情况:集群的JDK版本为1.7,但是自己编写并编译的程序是用1.8版本的jdk。这种程序提交到spark中就会出现

Unsupported major.minor version 52.0

为了解决这种情况,可以通过设置相应参数让spark集群调用对应版本的jdk执行对应的程序。

前提条件

在每一个集群节点上安装好相应的jdk版本,这里的安装是指将jdk压缩包解压到某个目录。

:该方法的缺点是必须在每台节点上安装jdk。为了克服该方法可以查看另一篇文章《Spark无管理权限配置Python或JDK》

在命令行显示调用

使用spark.executorEnv.JAVA_HOMEspark.yarn.appMasterEnv.JAVA_HOME为spark的executor和driver执行jdk路径:

1
2
3
4
5
6
7
8
9
$SPARK_HOME/bin/spark-submit --master yarn-cluster  \
--executor-memory 8g \
--num-executors 80 \
--queue eggzl \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/home/user/java/jdk1.8.0_25" \
--conf "spark.executorEnv.JAVA_HOME=/home/user/java/jdk1.8.0_25" \
--executor-cores 1 \
--class com.xx.xx \
/home/user/spark/app.jar

使用spark_default.conf

1
2
spark.yarn.appMasterEnv.JAVA_HOME  /home/user/java/jdk1.8.0_25
spark.executorEnv.JAVA_HOME /home/user/java/jdk1.8.0_25

其他的环境变量也可以通过该配置文件获取,比如 spark.executorEnv.MY_BLOG=www.zzz.com 这样我们就可以从程序里面获取 MY_BLOG 环境变量的值。

在运行Spark应用程序的时候,driver会提供一个webUI给出应用程序的运行信息,但是该webUI随着应用程序的完成而关闭端口,也就是说,Spark应用程序运行完后,将无法查看应用程序的历史记录。Spark history server就是为了应对这种情况而产生的,通过配置,Spark应用程序在运行完应用程序之后,将应用程序的运行信息写入指定目录,而Spark history server可以将这些运行信息装载并以web的方式供用户浏览。

要使用history server,对于提交应用程序的客户端需要配置以下参数(在conf/spark-defaults.conf中配置):

1
2
spark.eventLog.enabled  true 
spark.eventLog.dir hdfs://hadoop1:8000/sparklogs

进入$SPARK_HOME/sbin路径

1
2
./start-all.sh
./start-history-server.sh

注意:会启动失败,控制台显示

1
2
3
4
5
6
hadoop@Node4:/usr/local/SPARK/spark-1.1.0-bin-hadoop2.4/sbin$ ./start-history-server.sh 
starting org.apache.spark.deploy.history.HistoryServer, logging to /usr/local/SPARK/spark-1.1.0-bin-hadoop2.4/sbin/../logs/spark-hadoop-org.apache.spark.deploy.history.HistoryServer-1-Node4.out
failed to launch org.apache.spark.deploy.history.HistoryServer:
at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:41)
... 6 more
full log in /usr/local/SPARK/spark-1.1.0-bin-hadoop2.4/sbin/../logs/spark-hadoop-org.apache.spark.deploy.history.HistoryServer-1-Node4.out

找到日志文件,发现报错 Logging directory must be specified
解决:在启动historyserver的时候需要加上参数,指明log的存放位置,例如,我们在spark-default.conf中配置的存放路径为hdfs://hadoop1:8000/sparklogs
有下面两种方法解决问题
1. 将启动命令改成

1
start-history-server.sh hdfs://node4:9000/directory

2. 启动命令不变,在conf/spark-env.sh中添加

1
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node4:9000/directory"

这样,在启动HistoryServer之后,在浏览器中打开http://node4:18080就可以看到web页面了

附:在conf/spark-defaults.conf中配置参数

history server相关的配置参数描述

1) spark.history.updateInterval
  默认值:10
  以秒为单位,更新日志相关信息的时间间隔

2)spark.history.retainedApplications
  默认值:50
  在内存中保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,当再次访问已被删除的应用信息时需要重新构建页面。

3)spark.history.ui.port
  默认值:18080
  HistoryServer的web端口

4)spark.history.kerberos.enabled
  默认值:false
  是否使用kerberos方式登录访问HistoryServer,对于持久层位于安全集群的HDFS上是有用的,如果设置为true,就要配置下面的两个属性

5)spark.history.kerberos.principal
  默认值:用于HistoryServer的kerberos主体名称

6)spark.history.kerberos.keytab
  用于HistoryServer的kerberos keytab文件位置

7)spark.history.ui.acls.enable
  默认值:false
  授权用户查看应用程序信息的时候是否检查acl。如果启用,只有应用程序所有者和spark.ui.view.acls指定的用户可以查看应用程序信息;否则,不做任何检查

8)spark.eventLog.enabled
  默认值:false
  是否记录Spark事件,用于应用程序在完成后重构webUI

9)spark.eventLog.dir
  默认值:file:///tmp/spark-events
  保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建

10)*spark.eventLog.compress *
  默认值:false
  是否压缩记录Spark事件,前提spark.eventLog.enabled为true,默认使用的是snappy

以spark.history开头的需要配置在spark-env.sh中的SPARK_HISTORY_OPTS,以spark.eventLog开头的配置在spark-defaults.conf

spark参数介绍

[TOC]

1 spark.driver.memory

设置分配给spark driver程序的内存,driver端会运行DAGScheduler、Backend等等进程。

2 spark.executor.cores

单个Executor使用的CPU核数。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

3 spark.executor.memory

设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

4 spark.executor.instances

该参数对于静态分配,表示Executor的数量。

如果启用了spark.dynamicAllocation.enabled,则表示Executors的初始大小。

Spark无管理权限配置Python或JDK

[TOC]

部署java

将对应的jdk文件夹压缩成tar.gz

tar -zcf jdk1.8.0_77.tar.gz jdk1.8.0_77

然后在提交任务的时候添加某些参数:

1
2
3
4
5
$SPARK_HOME/bin/spark-submit --master yarn-cluster  \
--conf "spark.yarn.dist.archives=/home/zzz/jdk1.8.0_77.tar.gz" \
--conf "spark.executorEnv.JAVA_HOME=./jdk1.8.0_77.tar.gz/jdk1.8.0_77"\
--conf "spark.yarn.appMasterEnv.JAVA_HOME=./jdk1.8.0_77.tar.gz/jdk1.8.0_77" \
xxxxx
  • spark.yarn.dist.archives:逗号分隔的压缩包。运行时,会将这些压缩包分发到所有的executor节点的工作目录,executor节点会自动解压这些压缩包。路径规则参考上面的命令。

运行结果《在hadoop集群的任一台机器上执行 ps -ef | grep jdk1.8》

1
2
3
4
5
6
hadoop   110334 110332  0 13:40 ?        00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp '-XX:MaxPermSize=2048m' '-XX:PermSize=512m' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'org.apache.spark.ml.alogrithm.SmartRules' --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/zl-workflow-component-0.3.2-20180320-1101.jar --arg 'hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json' --properties-file /home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties 1> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stdout 2> /home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001/stderr
hadoop 110891 110334 99 13:40 ? 00:00:34 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx1024m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/tmp -XX:MaxPermSize=2048m -XX:PermSize=512m -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class org.apache.spark.ml.alogrithm.SmartRules --jar hdfs://slave131:9000/user/mls_zl/lib2/cmpt/zl-workflow-component-0.3.2-20180320-1101.jar --arg hdfs://slave131:9000/user/mls_3.5/proc/1/11/92/submit_SmartRules_37Client.json --properties-file /home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000001/__spark_conf__/__spark_conf__.properties
hadoop 111013 111010 0 13:40 ? 00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002/stderr
hadoop 111567 111013 99 13:40 ? 00:00:32 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000002 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 1 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000002/__app__.jar
hadoop 111619 111616 0 13:40 ? 00:00:00 /bin/bash -c ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=37011' -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar 1>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stdout 2>/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003/stderr
hadoop 112178 111619 99 13:40 ? 00:00:50 ./jdk-8u161-linux-x64.tar.gz/jdk1.8.0_161/bin/java -server -Xmx4096m -Djava.io.tmpdir=/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/tmp -Dspark.ui.port=0 -Dspark.driver.port=37011 -Dspark.yarn.app.container.log.dir=/home/hadoop/hadoop-2.7.3/logs/userlogs/application_1519271509270_0745/container_1519271509270_0745_01_000003 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.100.1.131:37011 --executor-id 2 --hostname slave131 --cores 8 --app-id application_1519271509270_0745 --user-class-path file:/home/hadoop/tmp/nm-local-dir/usercache/zhoulong/appcache/application_1519271509270_0745/container_1519271509270_0745_01_000003/__app__.jar

部署python

将对应的python压缩成tar.gz

$ tar -zcf anaconda2.tar.gz anaconda2

然后操作如下:

1
2
3
4
5
export PYSPARK_PYTHON=./anaconda2.tar.gz/anaconda2/bin/python
$SPARK_HOME/bin/pyspark
--conf "spark.yarn.dist.archives=/home/iteblog/anaconda2.tar.gz" \
--conf "spark.executorEnv.PYSPARK_PYTHON=./anaconda2.tar.gz/anaconda2/bin/python"\
--master yarn-client

缓存方法

[TOC]

来源: spark中cache和persist的区别

相同点

在spark中,cache和persist都是用于将一个RDD进行缓存的,这样在之后的使用过程中就不需要重新进行计算了,可以大大节省程序运行的时间。

区别

两者的区别在于:cache 其实是调用了 persist 方法,缓存策略为 MEMORY_ONLY。而 persist 可以通过设置参数有多种缓存策略。

persist12种策略

1
2
3
4
5
6
7
8
9
10
11
12
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

这12种策略都是由5个参数决定的。StorageLevel 构造函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
  • replication:备份数(在多个节点上备份)

另外还注意到OFF_HEAP这种策略

1
val OFF_HEAP = new StorageLevel(false, false, true, false)

使用了堆外内存,它不能和其它几个参数共存。

1
2
3
4
5
6
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}

共享变量 -广播变量与累加器(转)

[TOC]

来源: Spark学习笔记——广播变量与累加器

一、Spark 广播变量

1.引入

我们声明定义的变量是在Driver中产生,算子中的匿名函数是在Executor中执行的。也就是如果在Driver中定义的变量最终是要发送到task中去,task需要引用executor中线程池执行,而Executor是一个jvm进程,变量副本过多会占用jvm过多的堆内存,会引起频繁的GC、OOM。如果不使用广播变量,那么有多少个task就会在集群中有多少个变量副本。所以为了解决变量占用内存的问题,我们直接在executor层面保存一份变量即可。不用给每一个task都保存一份变量,只需要保存executor的个数那么多个。

2.广播变量的原理

task在执行的时候如果使用到了广播变量,它会找本地管理广播变量的组件(BlockManager)去要,如果本地的BlockManager中没有广播变量,BlockManager会去Driver端(有一个BlockManagerMaster组件)去拉取广播变量。

广播变量不是Driver主动发给executor的,而是等到哪个task执行使用到了广播变量,根据需要去取,免得浪费资源。

3.使用流程

Driver端:

1
val broadcast=sc.broatcast(变量) 广播的变量可以是基本类型和集合

Executor端:

1
broadcast.value

【注意事项】

1.广播变量只能在Driver端定义,不能在Executor端定义。

2.广播变量在Executor端不能修改。

4.广播变量+map实现join算子

因为join算子会产生shuffle,shuffle过程会有数据的移动,数据的读写I/O,占用过多的资源。所以我们在编写程序时尽量避免使用shuffle类的算子。

使用广播变量+map 实现join

适用场景:

1
2
一个RDD的数据量比较大,一个RDD的数据量比较小,适合用这种方式来取代join.
如果两个RDD的数据量都特别的大,那么会造成executor进程的OOM.

[代码演示]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TestBroadCast {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("BroadCast")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> nameList = Arrays.asList(
new Tuple2<String, String>("1", "zhangsan"),
new Tuple2<String, String>("2", "lisi"),
new Tuple2<String, String>("3", "wangwu")
);
List<Tuple2<String, String>> scoreList = Arrays.asList(
new Tuple2<String, String>("1", "90"),
new Tuple2<String, String>("2", "80"),
new Tuple2<String, String>("3", "89")
);
JavaPairRDD<String, String> nameRDD = sc.parallelizePairs(nameList);
JavaPairRDD<String, String> scoreRDD = sc.parallelizePair(scoreList);
List<Tuple2<String, String>> collect = nameRDD.collect();
Map<String, String> nameMap = new HashMap<>();
for (Tuple2<String, String> tuple2 : collect) {
nameMap.put(tuple2._1, tuple2._2);
}
final Broadcast<Map<String, String>> nMB =sc.broadcast(nameMap);
scoreRDD.map(new Function<Tuple2<String,String>,String>(){
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple) {
Map<String, String> nameMap = nMB.value();
String id=tuple._1;
String score=tuple._2;
String name=nameMap.get(id);
if(name != null){
System.out.println(id +" name:"+name +" score:"+score);
}
return null;
}
}).collect();
sc.stop();
}
}

[执行结果]

1
2
3
`id:1 name:zhangsan score:90
id:2 name:lisi score:80
id:3 name:wangwu score:89`

二、累加器

1.什么是累加器?

累加器可以看成是一个集群规模级别的一个大变量。

2.累加器与广播变量比较

累加器是在Driver端创建,在Driver端读取,在Executor端操作(累加操作),在Executor端是不能读取的。

广播变量是在Driver端创建,在Executor端读取,在Executor端不能修改。

3.利用累加器算文件的行数

[代码演示]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestAccumulator {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("BroadCast")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
final Accumulator<Integer> accumulator = sc.accumulator(0);
JavaRDD<String> userLogRDD = sc.textFile("cs");
userLogRDD.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String s) throws Exception {
accumulator.add(1);
}
});
System.out.println("line count:" + accumulator.value());
sc.stop();
}
}

4.累加器的错误用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
if(x%2 == 0){
accum += 1
0
}else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value
//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value

原因分析

官方对这个问题的解释如下描述:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。

所以在第一次count(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。

之后又对新产生的的newData进行了一次foreach(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

img

解决办法

看了上面的分析,大家都有这种印象了,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。

事实上,还是有解决方案的,只要将任务之间的依赖关系切断就可以了。什么方法有这种功能呢?你们肯定都想到了,cache,persist。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响到了。

img

1
2
3
4
5
6
7
8
9
10
11
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value
newData.foreach(println)
//此时的accum依旧是5
accum.value

注:使用Accumulator时,为了保证准确性,只使用一次action操作。如果需要使用多次则使用cache或persist操作切断依赖。

一、数据倾斜调优

1.1 调优概述

有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。

1.2 数据倾斜发生时的现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

1.3 数据倾斜发生的原理

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

数据倾斜原理

1.4 如何定位导致数据倾斜的代码

数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

1.4.1 某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。

如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

img

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里我们可以介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

  • stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。
  • stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。
1
2
3
4
5
6
7
8
9
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由educeByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

1.4.2 某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

1.5 查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:

  1. 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
  2. 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

1
2
3
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

1.6 数据倾斜的解决方案

1.6.1 解决方案一:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。

1.6.2 解决方案二:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

1.6.3 解决方案三:提高shuffle操作的并行度

方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

img

1.6.4 解决方案四:两阶段聚合(局部聚合+全局聚合)

方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});

// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});

// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});

// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});

1.6.5 解决方案五:将reduce join转为map join

方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。

方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。

方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);

// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
throws Exception {
// 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// 获取当前RDD数据的key以及value。
String key = tuple._1;
String value = tuple._2;
// 从rdd1数据Map中,根据key获取到可以join到的数据。
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});

// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

1.6.6 解决方案六:采样倾斜key并分拆join操作

方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

方案实现思路:

  • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
  • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  • 而另外两个普通的RDD就照常join即可。
  • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;

// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});

// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
new Function<Tuple2<Long,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(
Tuple2<Long, Row> tuple) throws Exception {
Random random = new Random();
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
}
return list;
}

});

// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
})
.join(skewedUserid2infoRDD)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, Row>> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
}
});

// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

1.6.7 解决方案七:使用随机前缀和扩容RDD进行join

方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

方案实现思路:

  • 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。
  • 然后将该RDD的每条数据都打上一个n以内的随机前缀。
  • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
  • 最后将两个处理后的RDD进行join即可。

方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

方案实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
}
return list;
}
});

// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});

// 将两个处理后的RDD进行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

1.6.8 解决方案八:多种方案组合使用

在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,我们针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。大家需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

二、shuffle调优

2.1 调优概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

2.2 ShuffleManager发展概述

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

下面我们详细分析一下HashShuffleManager和SortShuffleManager的原理。

2.3 HashShuffleManager运行原理

2.3.1 未经优化的HashShuffleManager

下图说明了未经优化的HashShuffleManager的原理。这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

img

2.3.2 优化后的HashShuffleManager

下图说明了优化后的HashShuffleManager的原理。这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

img

2.4 SortShuffleManager运行原理

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

2.4. 1普通运行机制

下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
img

2.4.2 bypass运行机制

下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
img

2.5 shuffle相关参数调优

以下是Shffule过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

  • 默认值:32k
  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m
  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默认值:3
  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
  • 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s
  • 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2
  • 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

  • 默认值:sort
  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
  • 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false
  • 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
  • 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

RDD函数

[TOC]

RDD提供了两种类型的操作:transformation 和 action

所有的 transformation 都是采用的懒策略,如果只是将 transformation 提交是不会执行计算的,计算只有在action被提交的时候才被触发。

一、基本RDD

抽象类RDD包含了各种数据类型的RDD都适用的通用操作。

1.1 transformation操作

针对各个元素的转化操作

1.1.1 map(func)

接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果

1
2
3
4
5
scala> val words = List("zks","zhaikaishun","kaishun","kai","xiaozhai")
words: List[String] = List(zks, zhaikaishun, kaishun, kai, xiaozhai)

scala> words.map(_.length)
res2: List[Int] = List(3, 11, 7, 3, 8)

1.1.2 flatMap(func)

接收一个函数(这个函数的返回结果通常为集合),把这个函数用于 RDD 中的每个元素,将函数的返回结果展平,将展平后的数据作为结果

1
2
3
4
5
scala> val words = List("zks","zhaikaishun","kaishun","kai","xiaozhai")
words: List[String] = List(zks, zhaikaishun, kaishun, kai, xiaozhai)

scala> words.flatMap(_.toList)
res4: List[Char] = List(z, k, s, z, h, a, i, k, a, i, s, h, u, n, k, a, i, s, h, u, n, k, a, i, x, i, a, o, z, h, a, i)

1.1.3 filter(func)

过滤不满足条件的元素。filter操作可能会引起数据倾斜,甚至可能导致空分区,新形成的RDD将会包含这些可能生成的空分区。所有这些都可能会导致问题,要想解决它们,最好在filter之后重新分区。

1.2 伪集合操作

尽管RDD不是严格意义上的集合,但它支持许多数学上的集合操作。注意:这些操作都要求操作的RDD是相同的数据类型的。

1.2.1 distinct()

对RDD中的元素进行去重处理。需要注意的是,distinct操作开销很大,因为它需要shuffle所有数据,以确保每一个元素都只有一份。

1.2.2 union(otherDataset)

合并两个RDD中所有元素的RDD。spark的union并不会去重,这点与数学上的不同。

1.2.3 intersection

返回两个RDD中都有的元素(即取交集)。intersection会在运行时除去所有重复的元素,因此它也需要shuffle,性能要差一些。

1.2.4 subtract

返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle

1.2.5 cartesian(otherDataset)

笛卡尔积。在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。开销非常大。

1.2.6 sample(withReplacement, frac, seed)

根据给定的 seed,从RDD中随机地按 指定比例frac 选一部分记录,创建新的RDD

withReplacement 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样。

1.3 基于分区的转化操作

1.3.1 glom

将每个分区中的所有元素都形成一个数组。如果在处理当前元素时需要使用前后的元素,该操作将会非常有用,不过有时我们可能还需要将分区边界的数据收集起来并广播到各节点以备使用。

1
2
3
4
5
6
val rdd = sc.parallelize(1 to 16,4)
val glomRDD = rdd.glom() //RDD[Array[T]]
glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))

// 输出
int[] //说明RDD中的元素被转换成数组Array[Int]

1.3.2 mapPartitions

基于分区的map,spark对每个分区的迭代器进行操作。

普通的map算子对RDD中的每一个元素进行操作,而 mapPartitions 算子对RDD中每一个分区进行操作。

  • 如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。
  • 如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。

1.3.3 mapPartitionsWithIndex

与mapPartitions不同之处在于带有分区的序号。

1.4 管道(pipe)操作

spark在RDD上提供了pipe()方法。通过pipe(),你可以使用任意语言将RDD中的各元素从标准输入流中以字符串形式读出,并将这些元素执行任何你需要的操作,然后把结果以字符串形式写入标准输出,这个过程就是RDD的转化操作过程。

使用pipe()的方法很简单,假如我们有一个用其他语言写成的从标准输入接收数据并将处理结果写入标准输出的可执行脚本,我们只需要将该脚本分发到各个节点相同路径下,并将其路径作为pipe()的参数传入即可。

创建外部shell脚本:

1
2
3
4
5
#!/bin/sh
echo "Running shell script"
while read LINE; do
echo ${LINE}!
done

Spark rdd 调用:

1
2
3
4
val data = sc.parallelize(List("hi","hello","how","are","you"))
val scriptPath = "/root/echo.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect()

得到结果:

1
Array[String] = Array(Running shell script, hi!, Running shell script, hello!, Running shell script, how!, Running shell script, are!, you!)

1.5 action操作

1.5.1 foreach(func)

对每个元素进行操作,并不会返回结果。这通常用于更新一个累加器变量,或者和外部存储系统做交互

1.5.2 foreachPartition

基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。

1.5.3 reduce(func)

reduce方法将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。

1
2
val c = sc.parallelize(1 to 10)
c.reduce((x, y) => x + y)//结果55

1.5.4 fold(zeroValue)(func)

与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold要求函数返回值类型与RDD元素类型相同。

1
2
val l = List(1,2,3,4)
l.fold(0)((x, y) => x + y)//结果55

1.5.5 aggregate(zeroValue)(seqOp, combOp)

与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。

该函数签名如下:

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {...}

zeroValue

需要注意的是,zeroValue 会被每一个分区计算一次.
计算过程中的初始值,同时也确定了最终结果的类型

seqOp 函数 : 对分区中的元素进行迭代计算,将一个分区中的所有元素聚合为一个 U 类型的结果。

参数U 的来源 : 1 zeroValue ; 2 seqOp 的输出结果。即当前的聚合结果。
参数T 的来源 : 原始数据分区中的元素

combOp 函数 : 对 seqOp 函数 产生的结果进行聚合。

参数U 的来源 : seqOp 进行聚合后,产生的结果。

aggregate()使用举例:计算平均数

要算平均值,需要两个值: 一个是rdd的各元素的累加和,另一个是元素计数,初始化为(0, 0)

初始如下:

1
2
val l = List(1,2,3,4)
l.aggregate(0, 0)(seqOp, combOp)

seqOp如下:

1
(x, y) => (x._1 + y, x._2 + 1)
  • x 代表的是zeroValue初始值或者seqOp的返回值,即是(Int, Int)啊,可以用x._1x._2来代指这两个元素的,
  • y代表rdd遍历过程中的元素

因此x._1 + y就是各个元素的累加和,x._2 + 1就是元素计数。遍历完成后返回的(Int, Int)就是累加和元素计数

combOp如下:

1
(x, y) => (x._1 + y._1, x._2 + y._2)

因为我们的计算是分布式计算,combOp是将前面seqOp的结果进行合并的。

例如第一个节点遍历1和2, 返回的是(3, 2),第二个节点遍历3和4, 返回的是(7, 2),那么将它们合并的话就是3 + 7, 2 + 2.

最后程序是这样的:

1
2
3
val l = List(1,2,3,4)
r = l.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
m = r._1 / r._2.toFload

m就是所要求的均值。

1.5.6 count()

返回数据集的元素个数

1.5.7 collect()

以数组的形式,返回数据集的所有元素到Driver节点。collect()函数通常在使用filter或者其它操作减少数据量的函数后再使用。因为如果返回的数据量很大,很可能会让Driver程序OOM

1.5.8 take(n)

take用于获取RDD中从0到n-1下标的元素,不排序。

1.5.9 top

如果为元素定义了顺序,就可以使用top返回前几个元素。

1.5.10 takeSample(withReplacement,num,seed)

根据指定的种子seed,采样返回指定个数num的元素,并以 数组 的形式返回。

二、键值对RDD

PairRDDFunctions封装了用于操作键值对RDD的一些功能函数。一些文件读取操作(sc.sequenceFile()等)会直接返回RDD[(K, V)]类型。在RDD上使用map操作也可以将一个RDD转换为RDD[(K, V)]类型。在用Scala书写的Spark程序中,RDD[(K, V)]类型到PairRDDFunctions类型的转换一般由隐式转换函数完成。

2.1 transformation操作

2.1.1 mapValues

对各个键值对的值进行映射。该操作会保留RDD的分区信息。

1
2
3
4
val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.mapValues(_+1).collect.foreach(println)//对每个value进行+1

结果:

1
2
3
4
(hadoop,2)
(spark,2)
(hive,2)
(spark,2)

2.1.2 flatMapValues

对各个键值对的值进行映射,并将最后结果展平。该操作会保留RDD的分区信息。

1
2
3
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()

结果:

1
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

2.2 聚合操作

2.2.1 reduceByKey(func)

使用 func 函数合并具有相同键的值。reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。

1
2
3
4
val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map((_,1))
pairRdd.reduceByKey(_+_).collect.foreach(println)

结果:

1
2
3
(hive,1)
(spark,2)
(hadoop,1)

2.2.2 foldByKey

与fold类似,就像reduceByKey之于reduce那样。

熟悉MapReduce中的合并器(combiner)概念的你可能已经注意到,reduceByKeyfoldByKey会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并。用户不需要指定合并器。

2.2.3 combineByKey

是最常用的基于键进行聚合的函数,大多数基于键聚合的函数都是用它实现的。与aggregate一样,combineByKey可以让用户返回与输入数据的类型不同的返回值。combineByKey的内部实现分为三步来完成:

  • 首先根据是否需要在map端进行combine操作决定是否对RDD先进行一次mapPartitions操作(利用createCombiner、mergeValue、mergeCombiners三个函数)来达到减少shuffle数据量的作用。
  • 第二步根据partitioner对MapPartitionsRDD进行shuffle操作。
  • 最后在reduce端对shuffle的结果再进行一次combine操作。

2.3 分组操作

2.3.1 groupBy

根据自定义的东东进行分组。groupBy是基本RDD就有的操作。

2.3.2 groupByKey

根据键对数据进行分组。虽然groupByKey+reduce也可以实现reduceByKey一样的效果.

但是请你记住:groupByKey是低效的,而reduceByKey会在本地先进行聚合,然后再通过网络传输求得最终结果。在执行聚合或分组操作时,可以指定分区数以对并行度进行调优。

1
2
3
4
val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.groupByKey().collect.foreach(println)

结果:

1
2
3
(hive,CompactBuffer(1))
(spark,CompactBuffer(1, 1))
(hadoop,CompactBuffer(1))

2.4 连接操作

2.4.1 cogroup

可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。

2.4.2 join

对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。

1
2
3
4
5
6
7
8
9
10
11
12
scala> val a = sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1"),("D","d1"),("E","e1"),("F","f1")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = sc.parallelize(Array(("A","a2"),("B","b2"),("C","c1"),("C","c2"),("C","c3"),("E","e2")))
b: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> a.join(b).collect    // 这里的join是inner join,只返回左右都匹配上的内容

res1: Array[(String, (String, String))] = Array((B,(b1,b2)), (A,(a1,a2)), (C,(c1,c1)), (C,(c1,c2)), (C,(c1,c3)), (E,(e1,e2)))

scala> b.join(a).collect    
res2: Array[(String, (String, String))] = Array((B,(b2,b1)), (A,(a2,a1)), (C,(c1,c1)), (C,(c2,c1)), (C,(c3,c1)), (E,(e2,e1)))

2.4.3 leftOuterJoin

即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。

1
2
3
4
5
scala> a.leftOuterJoin(b).collect
res3: Array[(String, (String, Option[String]))] = Array((B,(b1,Some(b2))), (F,(f1,None)), (D,(d1,None)), (A,(a1,Some(a2))), (C,(c1,Some(c1))), (C,(c1,Some(c2))), (C,(c1,Some(c3))), (E,(e1,Some(e2))))

scala> b.leftOuterJoin(a).collect
res5: Array[(String, (String, Option[String]))] = Array((B,(b2,Some(b1))), (A,(a2,Some(a1))), (C,(c1,Some(c1))), (C,(c2,Some(c1))), (C,(c3,Some(c1))), (E,(e2,Some(e1))))

2.4.4 rightOuterJoin

即右外连接,与左外连接相反。

2.4.5 fullOuterJoin

即全外连接,它是是左右外连接的并集。

如果一个RDD需要在多次连接操作中使用,对该RDD分区并持久化分区后的RDD是有益的,它可以避免不必要的shuffle。

1
2
3
4
5
6
7
8
9
10
11
scala> val a = sc.parallelize(Array(("A","a1"),("B","b1"),("C","c1"),("D","d1"),("E","e1"),("F","f1")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[49] at parallelize at <console>:24

scala> val b = sc.parallelize(Array(("A","a2"),("B","b2"),("C","c1"),("C","c2"),("C","c3"),("E","e2")))
b: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[50] at parallelize at <console>:24

scala> a.fullOuterJoin(b).collect
res15: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b1),Some(b2))), (F,(Some(f1),None)), (D,(Some(d1),None)), (A,(Some(a1),Some(a2))), (C,(Some(c1),Some(c1))), (C,(Some(c1),Some(c2))), (C,(Some(c1),Some(c3))), (E,(Some(e1),Some(e2))))

scala> b.fullOuterJoin(a).collect
res16: Array[(String, (Option[String], Option[String]))] = Array((B,(Some(b2),Some(b1))), (F,(None,Some(f1))), (D,(None,Some(d1))), (A,(Some(a2),Some(a1))), (C,(Some(c1),Some(c1))), (C,(Some(c2),Some(c1))), (C,(Some(c3),Some(c1))), (E,(Some(e2),Some(e1))))

2.5 数据排序:

在基本类型RDD中,sortBy()可以用来排序,max()min()则可以用来方便地获取最大值和最小值。另外,在OrderedRDDFunctions中,存在一个sortByKey()可以方便地对键值对RDD进行排序,通过spark提供的隐式转换函数可以将RDD自动地转换为OrderedRDDFunctions,并随意地使用它的排序功能。

2.6行动操作:

键值对RDD提供了一些额外的行动操作供我们随意使用。如下:

2.6.1 countByKey

对每个键对应的元素分别计数。

2.6.2 collectAsMap

将结果以Map的形式返回,以便查询。

1
2
3
4
5
6
7
scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), 
    (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
data: org.apache.spark.rdd.RDD[(Int, String)] =
    ParallelCollectionRDD[26] at parallelize at <console>:12

scala> data.collectAsMap
res28: scala.collection.Map[Int,String] = Map(2 -> com, 1 -> com, 3 -> good)

从结果我们可以看出,如果RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value。

2.6.3 lookup:

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

1
2
3
4
5
6
7
8
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)

scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)

三、数值Rdd

DoubleRDDFunctions为包含数值数据的RDD提供了一些描述性的统计操作,RDD可以通过隐式转换方便地使用这些方便的功能。

这些数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用stats()时通过一次遍历数据计算出来,并以StatCounter对象返回。如果你只想计算这些统计数据中的一个,也可以直接对RDD调用对应的方法。更多信息参见Spark API。

参考

spark使用总结

DataFrame函数

[TOC]

一、Action 操作

1.1 collect()

返回值是一个数组,返回dataframe集合所有的行

1.2 collectAsList()

返回值是一个java类型的数组,返回dataframe集合所有的行

1.3 count()

返回一个number类型的,返回dataframe集合的行数

1.4 describe(cols: String*)

描述某些列的count, mean, stddev, min, max。这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算。

1
df.describe("age", "height").show()

1.5 first()

返回第一行 ,类型是row类型

1.6 head()

返回第一行 ,类型是row类型

1.7 head(n:Int)

返回n行 ,类型是row 类型

1.8 show()

返回dataframe集合的值 默认是20行,返回类型是unit

1.9 show(n:Int)

返回n行,返回值类型是unit

1.10 table(n:Int)

返回n行 ,类型是row 类型

二、 dataframe的基本操作

2.1 cache()

将DataFrame缓存到内存中,以便之后加速运算

2.2 columns

返回一个string类型的数组,返回值是所有列的名字

2.3 dtypes

返回一个string类型的二维数组,返回值是所有列的名字以及类型

2.4 explan()

打印物理执行计划

2.5 explain(n:Boolean)

输入值为 false 或者true ,返回值是unit 默认是false ,如果输入true 将会打印 逻辑的和物理的

2.6 isLocal

返回值是Boolean类型,如果允许模式是local返回true 否则返回false

2.7 persist(newlevel:StorageLevel)

与cache类似。不同的是:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

2.8 printSchema()

打印出字段名称和类型 按照树状结构来打印

2.9 registerTempTable(tablename:String)

返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了

2.10 schema

返回structType 类型,将字段名称和类型按照结构体类型返回

2.11 toDF()

返回一个新的dataframe类型的

2.12 toDF(colnames:String*)

将参数中的几个字段返回一个新的dataframe类型的,

2.13 unpersist()

返回dataframe.this.type 类型,去除模式中的数据

2.14 unpersist(blocking:Boolean)

返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

##三、集成查询:

3.1 数据说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
import spark.implicits._
val df = spark.read.format("csv").option("header", true).load("/user/hadoop/csvdata/csvdata")
df.show()
scala >
scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----+

3.2 agg

在整个数据集范围类进行聚合操作。该函数相当于:ds.groupBy().agg(…)

  • 函数原型

1、 agg(expers:column*)

1
2
df.agg(max("age"), avg("salary"))
df.groupBy().agg(max("age"), avg("salary"))

2、 agg(exprs: Map[String, String])

1
2
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))

3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)

1
2
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
  • 例子:
1
2
3
4
5
6
scala> df.agg("score"->"avg", "score"->"max", "score"->"min", "score"->"count").show()
+----------+----------+----------+------------+
|avg(score)|max(score)|min(score)|count(score)|
+----------+----------+----------+------------+
| 40.0| 90| 10| 12|
+----------+----------+----------+------------+

3.3 groupBy

使用指定的列对数据集进行分组,以便我们可以对其进行聚合。请参阅RelationalGroupedDataset获取所有可用的聚合函数。
这是groupBy的一个变体,它只能使用列名称对现有列进行分组(即不能构造表达式)。

  • 函数原型

1、def groupBy(col1: String, cols: String): RelationalGroupedDataset
2、def groupBy(cols: Column
): RelationalGroupedDataset

1
2
df.groupBy("age").agg(Map("age" ->"count")).show();
df.groupBy("age").avg().show();
  • 例子1

在使用groupBy函数时,一般都是先分组,在使用agg等聚合函数对数据进行聚合。
按name字段进行聚合,然后再使用agg聚合函数进行聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> df.groupBy("name").agg("score"->"avg").sort("name").show()
+----+----------+
|name|avg(score)|
+----+----------+
| n1| 10.0|
| n2| 20.0|
| n3| 25.0|
| n4| 40.0|
| n5| 50.0|
| n6| 50.0|
| n8| 90.0|
| n9| 40.0|
+----+----------+
  • 例子2

按id和name两个字段对数据集进行分组,然后求score列的平均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> df.groupBy("id","name").agg("score"->"avg").sort("name").show()
+---+----+----------+
| id|name|avg(score)|
+---+----+----------+
| 1| n1| 10.0|
| 2| n2| 20.0|
| 3| n3| 25.0|
| 4| n4| 40.0|
| 5| n5| 50.0|
| 7| n6| 40.0|
| 6| n6| 60.0|
| 8| n8| 90.0|
| 9| n9| 45.0|
| 8| n9| 30.0|
+---+----+----------+
  • 例子3

计算每个subject的平均分数:

1
2
3
4
5
6
7
8
scala> df.groupBy("subject").agg("score"->"avg").sort("subject").show()
+-------+------------------+
|subject| avg(score)|
+-------+------------------+
| s1| 28.0|
| s2| 42.5|
| s3|56.666666666666664|
+-------+------------------+
  • 例子4

同时计算多个列值的平均值,最小值,最大值。
(注:我这里用的是同一列,完全可以是不同列)

1
2
3
4
5
6
7
8
scala> df.groupBy("subject").agg("score"->"avg", "score"->"max", "score"->"min", "score"->"count").sort("subject").show()
+-------+------------------+----------+----------+------------+
|subject| avg(score)|max(score)|min(score)|count(score)|
+-------+------------------+----------+----------+------------+
| s1| 28.0| 60| 10| 5|
| s2| 42.5| 70| 20| 4|
| s3|56.666666666666664| 90| 30| 3|
+-------+------------------+----------+----------+------------+

3.4 apply 和 col

根据列名选择列并将其作为列返回。

  • 函数原型
1
2
def apply(colName: String): Column 
def col(colName: String): Column
  • 例子1
1
2
3
4
5
scala> df.apply("name")
res11: org.apache.spark.sql.Column = name

scala> df.col("name")
res16: org.apache.spark.sql.Column = name

3.7 cube

使用指定的列为当前数据集创建一个多维数据集,因此我们可以对它们运行聚合。请参阅RelationalGroupedDataset获取所有可用的聚合函数。
这是立方体的变体,只能使用列名称对现有列进行分组(即不能构造表达式)。

  • 原型
1
2
def cube(col1: String, cols: String*): RelationalGroupedDataset 
def cube(cols: Column*): RelationalGroupedDataset
  • 例子1
1
2
scala> df.cube("name", "score")
res18: org.apache.spark.sql.RelationalGroupedDataset = org.apache.spark.sql.RelationalGroupedDataset@3f88db17

3.8 drop

删除数据集中的某个列。

  • 函数原型
1
2
3
def drop(col: Column): DataFrame 
def drop(colNames: String*): DataFrame
def drop(colName: String): DataFrame
  • 例子1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> df.drop("id").show()
+----+-------+-----+
|name|subject|score|
+----+-------+-----+
| n1| s1| 10|
| n2| s2| 20|
| n3| s3| 30|
| n3| s1| 20|
| n4| s2| 40|
| n5| s3| 50|
| n6| s1| 60|
| n6| s2| 40|
| n8| s3| 90|
| n9| s1| 30|
| n9| s1| 20|
| n9| s2| 70|
+----+-------+-----+

3.9 join

  • join类型的说明
    内连接 : 只连接匹配的行
    左外连接 : 包含左边表的全部行(不管右边的表中是否存在与它们匹配的行),以及右边表中全部匹配的行
    右外连接 : 包含右边表的全部行(不管左边的表中是否存在与它们匹配的行),以及左边表中全部匹配的行
    全外连接 : 包含左、右两个表的全部行,不管另外一边的表中是否存在与它们匹配的行。
  • 功能说明
    使用给定的连接表达式连接另一个DataFrame。以下执行df1和df2之间的完整外部联接。
    使用给定的连接表达式与另一个DataFrame进行内部连接。
    使用给定的列与另一个DataFrame进行设置连接。
    加入另一个DataFrame。
  • 函数原型
1
2
3
4
5
6
7
8
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame 
def join(right: Dataset[_], joinExprs: Column): DataFrame
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame

def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
// 内部使用给定的列与另一个DataFrame进行同等连接。
def join(right: Dataset[_], usingColumn: String): DataFrame
def join(right: Dataset[_]): DataFrame

注意:这里的joinType必须是这几个中的一个:inner, outer, left_outer, right_outer, leftsemi.

  • 例子1
    该例子演示inner join。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----+

scala> val df2 = df.select("id", "subject","score")
df2: org.apache.spark.sql.DataFrame = [id: string, subject: string ... 1 more field]

scala> df2.show()
+---+-------+-----+
| id|subject|score|
+---+-------+-----+
| 1| s1| 10|
| 2| s2| 20|
| 3| s3| 30|
| 3| s1| 20|
| 4| s2| 40|
| 5| s3| 50|
| 6| s1| 60|
| 7| s2| 40|
| 8| s3| 90|
| 8| s1| 30|
| 9| s1| 20|
| 9| s2| 70|
+---+-------+-----+

scala> val df3 = df.join(df2, df("id")===df2("id"))
17/12/03 21:40:59 WARN Column: Constructing trivially true equals predicate, 'id#0 = id#0'. Perhaps you need to use aliases.
df3: org.apache.spark.sql.DataFrame = [id: string, name: string ... 5 more fields]

scala> df3.show()
+---+----+-------+-----+---+-------+-----+
| id|name|subject|score| id|subject|score|
+---+----+-------+-----+---+-------+-----+
| 1| n1| s1| 10| 1| s1| 10|
| 2| n2| s2| 20| 2| s2| 20|
| 3| n3| s3| 30| 3| s1| 20|
| 3| n3| s3| 30| 3| s3| 30|
| 3| n3| s1| 20| 3| s1| 20|
| 3| n3| s1| 20| 3| s3| 30|
| 4| n4| s2| 40| 4| s2| 40|
| 5| n5| s3| 50| 5| s3| 50|
| 6| n6| s1| 60| 6| s1| 60|
| 7| n6| s2| 40| 7| s2| 40|
| 8| n8| s3| 90| 8| s1| 30|
| 8| n8| s3| 90| 8| s3| 90|
| 8| n9| s1| 30| 8| s1| 30|
| 8| n9| s1| 30| 8| s3| 90|
| 9| n9| s1| 20| 9| s2| 70|
| 9| n9| s1| 20| 9| s1| 20|
| 9| n9| s2| 70| 9| s2| 70|
| 9| n9| s2| 70| 9| s1| 20|
+---+----+-------+-----+---+-------+-----+

scala> val df4 = df2.limit(6)
df4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, subject: string ... 1 more field]

scala> df4.show()
+---+-------+-----+
| id|subject|score|
+---+-------+-----+
| 1| s1| 10|
| 2| s2| 20|
| 3| s3| 30|
| 3| s1| 20|
| 4| s2| 40|
| 5| s3| 50|
+---+-------+-----+

scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----+

scala> val df6 = df.join(df4, "id")
df6: org.apache.spark.sql.DataFrame = [id: string, name: string ... 4 more fields]

scala> df6.show()
+---+----+-------+-----+-------+-----+
| id|name|subject|score|subject|score|
+---+----+-------+-----+-------+-----+
| 1| n1| s1| 10| s1| 10|
| 2| n2| s2| 20| s2| 20|
| 3| n3| s3| 30| s1| 20|
| 3| n3| s3| 30| s3| 30|
| 3| n3| s1| 20| s1| 20|
| 3| n3| s1| 20| s3| 30|
| 4| n4| s2| 40| s2| 40|
| 5| n5| s3| 50| s3| 50|
+---+----+-------+-----+-------+-----+
  • 例子2
    本例说明left_outer的使用和结果。
    注意:数据集df4和df与上例的相同。
    小结:通过例子可以看到,left_outer的效果是,保留左边表格的所有id,即使右边的表没有这些id(关联字段的值)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
scala> df4.show()
+---+-------+-----+
| id|subject|score|
+---+-------+-----+
| 1| s1| 10|
| 2| s2| 20|
| 3| s3| 30|
| 3| s1| 20|
| 4| s2| 40|
| 5| s3| 50|
+---+-------+-----+

scala> df.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----+

scala> val df7 = df.join(df4, df("id")===df4("id"), "left_outer")
17/12/03 21:53:40 WARN Column: Constructing trivially true equals predicate, 'id#0 = id#0'. Perhaps you need to use aliases.
df7: org.apache.spark.sql.DataFrame = [id: string, name: string ... 5 more fields]

scala> df7.show()
+---+----+-------+-----+----+-------+-----+
| id|name|subject|score| id|subject|score|
+---+----+-------+-----+----+-------+-----+
| 1| n1| s1| 10| 1| s1| 10|
| 2| n2| s2| 20| 2| s2| 20|
| 3| n3| s3| 30| 3| s1| 20|
| 3| n3| s3| 30| 3| s3| 30|
| 3| n3| s1| 20| 3| s1| 20|
| 3| n3| s1| 20| 3| s3| 30|
| 4| n4| s2| 40| 4| s2| 40|
| 5| n5| s3| 50| 5| s3| 50|
| 6| n6| s1| 60|null| null| null|
| 7| n6| s2| 40|null| null| null|
| 8| n8| s3| 90|null| null| null|
| 8| n9| s1| 30|null| null| null|
| 9| n9| s1| 20|null| null| null|
| 9| n9| s2| 70|null| null| null|
+---+----+-------+-----+----+-------+-----+

3.10 na

返回一个DataFrameNaFunctions以处理丢失的数据。

  • 函数原型
1
def na: DataFrameNaFunctions

注意:该函数会返回一个类型的类,该类包含了各种操作空列的函数。
这些函数包括:drop(),fill(),replace(),fillCol(),replaceCol()

  • 例子1
1
2
// 删除包含任何空值的行
scala> df.na.drop()
  • 例子2
1
2
// 使用常量值填充空值
scala> df.na.fill("null")

3.11 select

选择一组列。注意:该函数返回的是一个DataFrame类。

  • 函数原型
1
2
3
4
5
6
7
8
// 这是select的一个变体,只能使用列名选择现有的列(即不能构造表达式)。
def select(col: String, cols: String*): DataFrame

// 选择一组基于列的表达式。
def select(cols: Column*): DataFrame

// 选择一组SQL表达式。这是接受SQL表达式的select的一个变体。
def selectExpr(exprs: String*): DataFrame
  • 例子1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> df.select("id", "score").show()
+---+-----+
| id|score|
+---+-----+
| 1| 10|
| 2| 20|
| 3| 30|
| 3| 20|
| 4| 40|
| 5| 50|
| 6| 60|
| 7| 40|
| 8| 90|
| 8| 30|
| 9| 20|
| 9| 70|
+---+-----+
  • 例子2:对select的列值进行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> df.select($"id", $"score"*10).show()
+---+------------+
| id|(score * 10)|
+---+------------+
| 1| 100.0|
| 2| 200.0|
| 3| 300.0|
| 3| 200.0|
| 4| 400.0|
| 5| 500.0|
| 6| 600.0|
| 7| 400.0|
| 8| 900.0|
| 8| 300.0|
| 9| 200.0|
| 9| 700.0|
+---+------------+
  • 例子3:selectExpr的使用(select表达式)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ds.selectExpr("colA", "colB as newName", "abs(colC)")

ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))

scala> df.selectExpr("id", "score * 10").show()
+---+------------+
| id|(score * 10)|
+---+------------+
| 1| 100.0|
| 2| 200.0|
| 3| 300.0|
| 3| 200.0|
| 4| 400.0|
| 5| 500.0|
| 6| 600.0|
| 7| 400.0|
| 8| 900.0|
| 8| 300.0|
| 9| 200.0|
| 9| 700.0|
+---+------------+


scala> df.selectExpr("id", "score as points").show()
+---+------+
| id|points|
+---+------+
| 1| 10|
| 2| 20|
| 3| 30|
... ...

3.12 withColumn 和 withColumnRenamed

通过添加一列或替换具有相同名称的现有列来返回新的数据集。
withColumnRenamed只是重命名列。

  • 函数原型
1
2
def withColumn(colName: String, col: Column): DataFrame 
def withColumnRenamed(existingName: String, newName: String): DataFrame 12
  • 例子1:通过重命名现有列来添加新列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala> val df8 = df.withColumn("subs", df("subject"))
df8: org.apache.spark.sql.DataFrame = [id: string, name: string ... 3 more fields]

scala> df8.show()
+---+----+-------+-----+----+
| id|name|subject|score|subs|
+---+----+-------+-----+----+
| 1| n1| s1| 10| s1|
| 2| n2| s2| 20| s2|
| 3| n3| s3| 30| s3|
| 3| n3| s1| 20| s1|
| 4| n4| s2| 40| s2|
| 5| n5| s3| 50| s3|
| 6| n6| s1| 60| s1|
| 7| n6| s2| 40| s2|
| 8| n8| s3| 90| s3|
| 8| n9| s1| 30| s1|
| 9| n9| s1| 20| s1|
| 9| n9| s2| 70| s2|
+---+----+-------+-----+----+
  • 例子2:重命名现有列,但不添加新列

从下面的例子中可以看出,把score列的值替换了,但并没有添加新的列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
scala> val df9 = df.withColumn("score", df("score")/100)
df9: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> df9.show()
+---+----+-------+-----+
| id|name|subject|score|
+---+----+-------+-----+
| 1| n1| s1| 0.1|
| 2| n2| s2| 0.2|
| 3| n3| s3| 0.3|
| 3| n3| s1| 0.2|
| 4| n4| s2| 0.4|
| 5| n5| s3| 0.5|
| 6| n6| s1| 0.6|
| 7| n6| s2| 0.4|
| 8| n8| s3| 0.9|
| 8| n9| s1| 0.3|
| 9| n9| s1| 0.2|
| 9| n9| s2| 0.7|
+---+----+-------+-----+


// 也可以直接通过withColumnRenamed进行重命名
scala> val df9 = df.withColumnRenamed("score","score2")
df9: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> df9.show()
+---+----+-------+------+
| id|name|subject|score2|
+---+----+-------+------+
| 1| n1| s1| 10|
| 2| n2| s2| 20|
| 3| n3| s3| 30|
| 3| n3| s1| 20|
| 4| n4| s2| 40|
| 5| n5| s3| 50|
| 6| n6| s1| 60|
| 7| n6| s2| 40|
| 8| n8| s3| 90|
| 8| n9| s1| 30|
| 9| n9| s1| 20|
| 9| n9| s2| 70|
+---+----+-------+-----

3.13 stat

为工作统计功能支持返回一个DataFrameStatFunctions。
该类的函数包括:approxQuantile,corr,cov,freqItems,sampleBy,countMinSketch,bloomFilter,buildBloomFilter等

  • 函数原型
1
def stat: DataFrameStatFunctions
  • 例子1
1
2
3
4
5
6
7
8
9
10
11
12
scala> val cols = Array("score")
cols: Array[String] = Array(score)

scala> df.stat.freqItems(cols)
res56: org.apache.spark.sql.DataFrame = [score_freqItems: array<string>]

scala> df.stat.freqItems(cols).show()
+--------------------+
| score_freqItems|
+--------------------+
|[90, 30, 60, 50, ...|
+--------------------+

3.14 其他

  • as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名

  • distinct 去重 返回一个dataframe类型

  • dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe

  • except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的

  • explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分

    1
    2
    df.explode("name","names") {name :String=> name.split(" ")}.show();
    将name字段根据空格来拆分,拆分的字段放在names里面
  • filter(conditionExpr: String): 刷选部分数据,返回dataframe类型

    1
    2
    3
    df.filter("age>10").show();   
    df.filter(df("age")>10).show();
    df.where(df("age")>10).show();
  • intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
  • limit(n: Int) 返回dataframe类型 去n 条数据出来
  • orderBy(sortExprs: Column*) 做alise排序
  • sort(sortExprs: Column*) 排序 df.sort(df(“age”).desc).show(); 默认是asc
  • unionAll(other:Dataframe) 合并 df.unionAll(ds).show();