0%

时钟不同步造成的异常

异常信息:

This scheduler instance () is still active but was recovered by another instance in the cluster

对应源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
protected boolean doCheckin() throws JobPersistenceException {
boolean transOwner = false;
boolean transStateOwner = false;
boolean recovered = false;

Connection conn = getNonManagedTXConnection();
try {
// Other than the first time, always checkin first to make sure there is
// work to be done before we acquire the lock (since that is expensive,
// and is almost never necessary). This must be done in a separate
// transaction to prevent a deadlock under recovery conditions.
List<SchedulerStateRecord> failedRecords = null;
if (!firstCheckIn) {
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}

if (firstCheckIn || (failedRecords.size() > 0)) {
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;

// Now that we own the lock, make sure we still have work to do.
// The first time through, we also need to make sure we update/create our state record
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);

if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
//getLockHandler().obtainLock(conn, LOCK_JOB_ACCESS);
transOwner = true;

clusterRecover(conn, failedRecords);
recovered = true;
}
}

commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
throw e;
} finally {
try {
releaseLock(LOCK_TRIGGER_ACCESS, transOwner);
} finally {
try {
releaseLock(LOCK_STATE_ACCESS, transStateOwner);
} finally {
cleanupConnection(conn);
}
}
}

firstCheckIn = false;

return recovered;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
protected List<SchedulerStateRecord> findFailedInstances(Connection conn)
throws JobPersistenceException {
try {
List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
boolean foundThisScheduler = false;
long timeNow = System.currentTimeMillis();

// 获取 qrzt_scheduler_state 表中,记录。对应sql是:SELECT * FROM QRTZ_SCHEDULER_STATE WHERE SCHED_NAME = 'zl',其中SCHED_NAME是配置文件中的org.quartz.scheduler.instanceName值
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);

for(SchedulerStateRecord rec: states) {

// find own record...
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
foundThisScheduler = true;
if (firstCheckIn) {
failedInstances.add(rec);
}
} else {
// find failed instances...
if (calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
}

// The first time through, also check for orphaned fired triggers.
if (firstCheckIn) {
failedInstances.addAll(findOrphanedFailedInstances(conn, states));
}

// If not the first time but we didn't find our own instance, then
// Someone must have done recovery for us.
// !foundThisScheduler 表示 应用程序没有找到 自己的 instance
// !firstCheckIn 表示 应该表示 应用程序是否为第一次checkIn
if ((!foundThisScheduler) && (!firstCheckIn)) {
// FUTURE_TODO: revisit when handle self-failed-out impl'ed (see FUTURE_TODO in clusterCheckIn() below)
getLog().warn(
"This scheduler instance (" + getInstanceId() + ") is still " +
"active but was recovered by another instance in the cluster. " +
"This may cause inconsistent behavior.");
}

return failedInstances;
} catch (Exception e) {
lastCheckin = System.currentTimeMillis();
throw new JobPersistenceException("Failure identifying failed instances when checking-in: "
+ e.getMessage(), e);
}
}

spark使用教程

文章来源:spark使用教程

本文是spark的使用教程,文中主要用scala来讲解spark,并且会尽量覆盖较新版本的spark的内容。这篇文章主要记录了一些我平时学到的spark知识,虽然较长,但它并没有包含spark的方方面面,更多更全的spark教程和信息请在spark官网观看。

第一个Spark程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 功能:用spark实现的单词计数程序
* 环境:spark 1.6.1, scala 2.10.4
*/

// 导入相关类库
import org.apache.spark._

object WordCount {
def main(args: Array[String]) {
// 建立spark运行上下文
val sc = new SparkContext("local[3]", "WordCount", new SparkConf())

// 加载数据,创建RDD
val inRDD = sc.textFile("words.txt", 3)

// 对RDD进行转换,得到最终结果
val res = inRDD.flatMap(_.split(' ')).map((_, 1)).reduceByKey(_ + _)

// 将计算结果collect到driver节点,并打印
res.collect.foreach(println)

// 停止spark运行上下文
sc.stop()
}
}

关于RDD

弹性分布式数据集(RDD)是分布式处理的一个数据集的抽象,RDD是只读的,在RDD之上的操作都是并行的。实际上,RDD只是一个逻辑实体,其中存储了分布式数据集的一些信息,并没有包含所谓的“物理数据”,“物理数据”只有在RDD被计算并持久化之后才存在于内存或磁盘中。RDD的重要内部属性有:

  • 计算RDD分区的函数。
  • 所依赖的直接父RDD列表。
  • RDD分区及其地址列表。
  • RDD分区器。
  • RDD分区优先位置。

RDD操作起来与Scala集合类型没有太大差别,这就是Spark追求的目标:像编写单机程序一样编写分布式程序,但它们的数据和运行模型有很大的不同,用户需要具备更强的系统把控能力和分布式系统知识。

Transformation与Action

RDD提供了两种类型的操作:transformation操作(转化操作)和action操作(行动操作)。transformation操作是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD。action操作则是得到其他数据类型的结果。

所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,spark在内部只是用新的RDD记录这些transformation操作并形成RDD对象的有向无环图(DAG),计算只有在action被提交的时候才被触发。实际上,我们不应该把RDD看作存放着特定数据的数据集,而最好把每个RDD当作我们通过transformation操作构建出来的、记录如何计算数据的指令列表。

RDD的action算子会触发一个新的job,spark会在DAG中寻找是否有cached或者persisted的中间结果,如果没有找到,那么就会重新执行这些中间过程以重新计算该RDD。因此,如果想在多个action操作中重用同一个RDD,那么最好使用 cache()/persist()将RDD缓存在内存中,但如果RDD过大,那么最好使用 persist(StorageLevel.MEMORY_AND_DISK) 代替。注意cache/persist仅仅是设置RDD的存储等级,因此你应该在第一次调用action之前调用cache/persist。cache/persist使得中间计算结果存在内存中,这个才是说为啥Spark是内存计算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允许你把中间结果放内存里。

在spark程序中打印日志时,尤其需要注意打印日志的代码很有可能使用到了action算子,如果没有缓存中间RDD就可能导致程序的效率大大降低。另外,如果一个RDD的计算过程中有抽样、随机值或者其他形式的变化,那么一定要缓存中间结果,否则程序执行结果可能都是不准确的!

参考链接及进一步阅读:

RDD持久化(缓存)

正如在转化和行动操作部分所说的一样,为了避免在一个RDD上多次调用action操作从而可能导致的重新计算,我们应该将该RDD在第一次调用action之前进行持久化。对RDD进行持久化对于迭代式和交互式应用非常有好处,好处大大滴有。

持久化可以使用cache()或者persist()。默认情况下的缓存级别为MEMORY_ONLY,spark会将对象直接缓存在JVM的堆空间中,而不经过序列化处理。我们可以给persist()传递持久化级别参数以指定的方式持久化RDD。MEMORY_AND_DISK持久化级别尽量将RDD缓存在内存中,如果内存缓存不下了,就将剩余分区缓存在磁盘中。MEMORY_ONLY_SER将RDD进行序列化处理(每个分区序列化为一个字节数组)然后缓存在内存中。还有MEMORY_AND_DISK_SER等等很多选项。选择持久化级别的原则是:尽量选择缓存在内存中,如果内存不够,则首选序列化内存方式,除非RDD分区重算开销比缓存到磁盘来的更大(很多时候,重算RDD分区会比从磁盘中读取要快)或者序列化之后内存还是不够用,否则不推荐缓存到磁盘上。

如果要缓存的数据太多,内存中放不下,spark会自动利用最近最少使用(LRU)策略把最老的分区从内存中移除。对于仅放在内存中的缓存级别,下次要用到已被移除的分区时,这些分区就需要重新计算。对于使用内存与磁盘的缓存级别,被移除的分区都会被写入磁盘。

另外,RDD还有一个unpersist()方法,用于手动把持久化的RDD从缓存中移除。

环境变量SPARK_LOCAL_DIRS用来设置RDD持久化到磁盘的目录,它同时也是shuffle的缓存目录。

各种RDD与RDD操作

基本RDD

抽象类RDD包含了各种数据类型的RDD都适用的通用操作。下面对基本类型RDD的操作进行分门别类地介绍。

针对各个元素的转化操作:

  • map: 对各个元素进行映射操作。
  • flatMap: 对各个元素进行映射操作,并将最后结果展平。
  • filter: 过滤不满足条件的元素。filter操作可能会引起数据倾斜,甚至可能导致空分区,新形成的RDD将会包含这些可能生成的空分区。所有这些都可能会导致问题,要想解决它们,最好在filter之后重新分区。

伪集合操作:

尽管RDD不是严格意义上的集合,但它支持许多数学上的集合操作。注意:这些操作都要求操作的RDD是相同的数据类型的。

  • distinct: 对RDD中的元素进行去重处理。需要注意的是,distinct操作开销很大,因为它需要shuffle所有数据,以确保每一个元素都只有一份。
  • union: 返回一个包含两个或多个RDD中所有元素的RDD。spark的union并不会去重,这点与数学上的不同。
  • intersection: 返回两个RDD中都有的元素。intersection会在运行时除去所有重复的元素,因此它也需要shuffle,性能要差一些。
  • subtract: 返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle。
  • cartesian: 计算两个RDD的笛卡尔积。需要注意的是,求大规模RDD的笛卡尔积开销巨大。
  • sample: 对RDD进行采样,返回一个采样RDD。

基于分区的转化操作:

  • glom: 将每个分区中的所有元素都形成一个数组。如果在处理当前元素时需要使用前后的元素,该操作将会非常有用,不过有时我们可能还需要将分区边界的数据收集起来并广播到各节点以备使用。
  • mapPartitions: 基于分区的map,spark会为操作分区的函数该分区的元素的迭代器。
  • mapPartitionsWithIndex: 与mapPartitions不同之处在于带有分区的序号。

管道(pipe)操作:

spark在RDD上提供了pipe()方法。通过pipe(),你可以使用任意语言将RDD中的各元素从标准输入流中以字符串形式读出,并将这些元素执行任何你需要的操作,然后把结果以字符串形式写入标准输出,这个过程就是RDD的转化操作过程。

使用pipe()的方法很简单,假如我们有一个用其他语言写成的从标准输入接收数据并将处理结果写入标准输出的可执行脚本,我们只需要将该脚本分发到各个节点相同路径下,并将其路径作为pipe()的参数传入即可。

行动操作:

  • foreach: 对每个元素进行操作,并不会返回结果。
  • foreachPartition: 基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。
  • reduce: 对RDD中所有元素进行规约,最终得到一个规约结果。reduce接收的规约函数要求其返回值类型与RDD中元素类型相同。
  • fold: 与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold同样要求规约函数返回值类型与RDD元素类型相同。
  • aggregate: 与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。
  • count: 返回RDD元素个数。
  • collect: 收集RDD的元素到driver节点,如果数据有序,那么collect得到的数据也会是有序的。大数据量最好不要使用RDD的collect,因为它会在本机上生成一个新的Array,以存储来自各个节点的所有数据,此时更好的办法是将数据存储在HDFS等分布式持久化层上。
  • take: 返回指定数量的元素到driver节点。它会尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。需要注意的是,该操作返回元素的顺序与你预期的可能不一样。
  • top: 如果为元素定义了顺序,就可以使用top返回前几个元素。
  • takeSample: 返回采样数据。

键值对RDD

PairRDDFunctions封装了用于操作键值对RDD的一些功能函数。一些文件读取操作(sc.sequenceFile()等)会直接返回RDD[(K, V)]类型。在RDD上使用map操作也可以将一个RDD转换为RDD[(K, V)]类型。在用Scala书写的Spark程序中,RDD[(K, V)]类型到PairRDDFunctions类型的转换一般由隐式转换函数完成。

基本类型RDD的操作同样适用于键值对RDD。下面对键值对类型RDD特有的操作进行分门别类地介绍。

针对各个元素的转化操作:

  • mapValues: 对各个键值对的值进行映射。该操作会保留RDD的分区信息。
  • flatMapValues: 对各个键值对的值进行映射,并将最后结果展平。该操作会保留RDD的分区信息。

聚合操作:

  • reduceByKey: 与reduce相当类似,它们都接收一个函数,并使用该函数对值进行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。
  • foldByKey: 与fold类似,就像reduceByKey之于reduce那样。熟悉MapReduce中的合并器(combiner)概念的你可能已经注意到,reduceByKey和foldByKey会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并。用户不需要指定合并器。更泛化的combineByKey可以让你自定义合并的行为。
  • combineByKey: 是最常用的基于键进行聚合的函数,大多数基于键聚合的函数都是用它实现的。与aggregate一样,combineByKey可以让用户返回与输入数据的类型不同的返回值。combineByKey的内部实现分为三步来完成:首先根据是否需要在map端进行combine操作决定是否对RDD先进行一次mapPartitions操作(利用createCombiner、mergeValue、mergeCombiners三个函数)来达到减少shuffle数据量的作用。第二步根据partitioner对MapPartitionsRDD进行shuffle操作。最后在reduce端对shuffle的结果再进行一次combine操作。

数据分组:

  • groupBy: 根据自定义的东东进行分组。groupBy是基本RDD就有的操作。
  • groupByKey: 根据键对数据进行分组。虽然groupByKey+reduce也可以实现reduceByKey一样的效果,但是请你记住:groupByKey是低效的,而reduceByKey会在本地先进行聚合,然后再通过网络传输求得最终结果。

在执行聚合或分组操作时,可以指定分区数以对并行度进行调优。

连接:

  • cogroup: 可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。
  • join: 对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。
  • leftOuterJoin: 即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。
  • rightOuterJoin: 即右外连接,与左外连接相反。
  • fullOuterJoin: 即全外连接,它是是左右外连接的并集。

如果一个RDD需要在多次连接操作中使用,对该RDD分区并持久化分区后的RDD是有益的,它可以避免不必要的shuffle。

数据排序:

在基本类型RDD中,sortBy()可以用来排序,max()min()则可以用来方便地获取最大值和最小值。另外,在OrderedRDDFunctions中,存在一个sortByKey()可以方便地对键值对RDD进行排序,通过spark提供的隐式转换函数可以将RDD自动地转换为OrderedRDDFunctions,并随意地使用它的排序功能。

行动操作:

键值对RDD提供了一些额外的行动操作供我们随意使用。如下:

  • countByKey: 对每个键对应的元素分别计数。
  • collectAsMap: 将结果以Map的形式返回,以便查询。
  • lookup: 返回给定键对应的所有值。

数值RDD

DoubleRDDFunctions为包含数值数据的RDD提供了一些描述性的统计操作,RDD可以通过隐式转换方便地使用这些方便的功能。

这些数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用stats()时通过一次遍历数据计算出来,并以StatCounter对象返回。如果你只想计算这些统计数据中的一个,也可以直接对RDD调用对应的方法。更多信息参见Spark API。

RDD依赖、窄宽依赖

RDD依赖与DAG

一系列转化操作形成RDD的有向无环图(DAG),行动操作触发作业的提交与执行。每个RDD维护了其对直接父RDD(一个或多个)的依赖,其中包含了父RDD的引用和依赖类型信息,通过dependencies()我们可以获取对应RDD的依赖,其返回一个依赖列表。

通过RDD的父RDD引用就可以从DAG上向前回溯找到其所有的祖先RDD。spark提供了toDebugString方法来查看RDD的谱系。对于如下一段简单的代码:

1
2
3
val input = sc.parallelize(1 to 10)
val repartitioned = input.repartition(2)
val sum = repartitioned.sum

我们就可以通过在RDD上调用toDebugString来查看其依赖以及转化关系,结果如下:

1
2
3
4
5
6
7
8
9
10
// input.toDebugString
res0: String = (4) ParallelCollectionRDD[0] at parallelize at <console>:21 []

// repartitioned.toDebugString
res1: String =
(2) MapPartitionsRDD[4] at repartition at <console>:23 []
| CoalescedRDD[3] at repartition at <console>:23 []
| ShuffledRDD[2] at repartition at <console>:23 []
+-(4) MapPartitionsRDD[1] at repartition at <console>:23 []
| ParallelCollectionRDD[0] at parallelize at <console>:21 []

上述repartitioned的依赖链存在两个缩进等级。同一缩进等级的转化操作构成一个Stage(阶段),它们不需要混洗(shuffle)数据,并可以流水线执行(pipelining)。

窄依赖和宽依赖

spark中RDD之间的依赖分为窄(Narrow)依赖宽(Wide)依赖两种。我们先放出一张示意图:

窄依赖和宽依赖

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。图中,map/filter和union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。

宽依赖指子RDD的分区依赖于父RDD的多个或所有分区,这是因为shuffle类操作,如图中的groupByKey和未经协同划分的join。

窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier(路障)): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。关于流水线优化,从MapPartitionsRDD中compute()的实现就可以看出端倪,该compute方法只是对迭代器进行复合,复合就是嵌套,因此数据处理过程就是对每条记录进行同样的嵌套处理直接得出所需结果,而没有中间计算结果,同时也要注意:依赖过长将导致嵌套过深,从而可能导致栈溢出。

转换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。

另外,宽窄依赖的划分对spark的容错也具有重要作用,参见本文容错机制部分。

DAG到任务的划分

用户代码定义RDD的有向无环图,行动操作把DAG转译为执行计划,进一步生成任务在集群中调度执行。

具体地说,RDD的一系列转化操作形成RDD的DAG,在RDD上调用行动操作将触发一个Job(作业)的运行,Job根据DAG中RDD之间的依赖关系(宽依赖/窄依赖,也即是否发生shuffle)的不同将DAG划分为多个Stage(阶段),一个Stage对应DAG中的一个或多个RDD,一个Stage对应多个RDD是因为发生了流水线执行(pipelining),一旦Stage划分出来,Task(任务)就会被创建出来并发给内部的调度器,进而分发到各个executor执行,一个Stage会启动很多Task,每个Task都是在不同的数据分区上做同样的事情(即执行同样的代码段),Stage是按照依赖顺序处理的,而Task则是独立地启动来计算出RDD的一部分,一旦Job的最后一个Stage结束,一个行动操作也就执行完毕了。

Stage分为两种:ShuffleMapStageResultStageShuffleMapStage是非最终stage,后面还有其他的stage,所以它的输出一定是需要shuffle并作为后续stage的输入。ShuffleMapStage的最后Task就是ShuffleMapTaskResultStage是一个Job的最后一个Stage,直接生成结果或存储。ResultStage的最后Task就是ResultTask。一个Job含有一个或多个Stage,最后一个为ResultTask,其他都为ShuffleMapStage。

RDD不能嵌套

RDD嵌套是不被支持的,也即不能在一个RDD操作的内部再使用RDD。如果在一个RDD的操作中,需要访问另一个RDD的内容,你可以尝试join操作,或者将数据量较小的那个RDD广播(broadcast)出去。

你同时也应该注意到:join操作可能是低效的,将其中一个较小的RDD广播出去然后再join可以避免不必要的shuffle,俗称“小表广播”。

使用其他分区数据

由于RDD不能嵌套,这使得“在计算一个分区时,访问另一个分区的数据”成为一件困难的事情。那么有什么好的解决办法吗?请继续看。

spark依赖于RDD这种抽象模型进行粗粒度的并行计算,一般情况下每个节点的每次计算都是针对单一记录,当然也可以使用 RDD.mapPartition 来对分区进行处理,但都限制在一个分区内(当然更是一个节点内)。

spark的worker节点相互之间不能直接进行通信,如果在一个节点的计算中需要使用到另一个分区的数据,那么还是有一定的困难的。

你可以将整个RDD的数据全部广播(如果数据集很大,这可不是好办法),或者广播一些其他辅助信息;也可以从所有节点均可以访问到的文件(hdfs文件)或者数据库(关系型数据库或者hbase)中读取;更进一步或许你应该修改你的并行方案,使之满足“可针对拆分得到的小数据块进行并行的独立的计算,然后归并得到大数据块的计算结果”的MapReduce准则,在“划分大的数据,并行独立计算,归并得到结果”的过程中可能存在数据冗余之类的,但它可以解决一次性没法计算的大数据,并最终提高计算效率,hadoop和spark都依赖于MapReduce准则。

对RDD进行分区

何时进行分区?

spark程序可以通过控制RDD分区方式来减少通信开销。分区并不是对所有应用都是有好处的,如果给定RDD只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助,同时记得将分区得到的新RDD持久化哦。

更多的分区意味着更多的并行任务(Task)数。对于shuffle过程,如果分区中数据量过大可能会引起OOM,这时可以将RDD划分为更多的分区,这同时也将导致更多的并行任务。spark通过线程池的方式复用executor JVM进程,每个Task作为一个线程存在于线程池中,这样就减少了线程的启动开销,可以高效地支持单个executor内的多任务执行,这样你就可以放心地将任务数量设置成比该应用分配到的CPU cores还要多的数量了。

如何分区与分区信息

在创建RDD的时候,可以指定分区的个数,如果没有指定,则分区个数是系统默认值,即该程序所分配到的CPU核心数。在Java/Scala中,你可以使用rdd.getNumPartitions(1.6.0+)或rdd.partitions.size()来获取分区个数。

对基本类型RDD进行重新分区,可以通过repartition()函数,只需要指定重分区的分区数即可。repartition操作会引起shuffle,因此spark提供了一个优化版的repartition,叫做coalesce(),它允许你指定是否需要shuffle。在使用coalesce时,需要注意以下几个问题:

  • coalesce默认shuffle为false,这将形成窄依赖,例如我们将1000个分区重新分到100个中时,并不会引起shuffle,而是原来的10个分区合并形成1个分区。
  • 但是对于从很多个(比如1000个)分区重新分到很少的(比如1个)分区这种极端情况,数据将会分布到很少节点(对于从1000到1的重新分区,则是1个节点)上运行,完全无法开掘集群的并行能力,为了规避这个问题,可以设置shuffle为true。由于shuffle可以分隔stage,这就保证了上一阶段stage中的任务仍是很多个分区在并行计算,不这样设置的话,则两个上下游的任务将合并成一个stage进行计算,这个stage便会在很少的分区中进行计算。
  • 如果当前每个分区的数据量过大,需要将分区数量增加,以利于充分利用并行,这时我们可以设置shuffle为true。对于数据分布不均而需要重分区的情况也是如此。spark默认使用hash分区器将数据重新分区。

对RDD进行预置的hash分区,需将RDD转换为RDD[(key,value)]类型,然后就可以通过隐式转换为PairRDDFunctions,进而可以通过如下形式将RDD哈希分区,HashPartitioner会根据RDD中每个(key,value)中的key得出该记录对应的新的分区号:

1
PairRDDFunctions.partitionBy(new HashPartitioner(n))

另外,spark还提供了一个范围分区器,叫做RangePartitioner。范围分区器争取将所有的分区尽可能分配得到相同多的数据,并且所有分区内数据的上界是有序的。

一个RDD可能存在分区器也可能没有,我们可以通过RDD的partitioner属性来获取其分区器,它返回一个Option对象。

如何进行自定义分区

spark允许你通过提供一个自定义的Partitioner对象来控制RDD的分区方式,这可以让你利用领域知识进一步减少通信开销。

要实现自定义的分区器,你需要继承Partitioner类,并实现下面三个方法即可:

  • numPartitions: 返回创建出来的分区数。
  • getPartition: 返回给定键的分区编号(0到numPartitions-1)。
  • equals: Java判断相等性的标准方法。这个方法的实现非常重要,spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样spark才可以判断两个RDD的分区方式是否相同。

影响分区方式的操作

spark内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器。

不过转化操作的结果并不一定会按照已知的分区方式分区,这时输出的RDD可能就会丢失分区信息。例如,由于map()flatMap()函数理论上可以改变元素的键,因此当你对一个哈希分区的键值对RDD调用map/flatMap时,结果RDD就不会再有分区方式信息。不过,spark提供了另外两个操作mapValues()flatMapValues()作为替代方法,它们可以保证每个二元组的键保持不变。

这里列出了所有会为生成的结果RDD设好分区方式的操作:cogroup()join()leftOuterJoin()rightOuterJoin()fullOuterJoin()groupWith()groupByKey()reduceByKey()combineByKey()partitionBy()sortBy()sortByKey()mapValues()(如果父RDD有分区方式的话)、 flatMapValues()(如果父RDD有分区方式的话)、 filter()(如果父RDD有分区方式的话) 等。其他所有操作生成的结果都不会存在特定的分区方式。

最后,对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD会采用第一个父RDD的分区方式。

从分区中获益的操作

spark的许多操作都引入了将数据根据键跨节点进行shuffle的过程。所有这些操作都会从数据分区中获益。这些操作主要有:cogroup()join()leftOuterJoin()rightOuterJoin()fullOuterJoin()groupWith()groupByKey()reduceByKey()combineByKey()lookup() 等。

RDD分区优先位置

RDD分区优先位置与spark的调度有关,在spark进行任务调度的时候,会尽可能将任务分配到数据块所存储的节点。我们可以通过RDD的preferredLocations()来获取指定分区的优先位置,返回值是该分区的优先位置列表。

数据加载与保存

从程序中的集合生成

sc.parallelize()可用于从程序中的集合产生RDD。sc.makeRDD()也是在程序中生成RDD,不过其还允许指定每一个RDD分区的优先位置。

以上这些方式一般用于原型开发和测试,因为它们需要把你的整个数据集先放在一台机器(driver节点)的内存中,从而限制了只能用较小的数据量。

从文本文件加载数据

sc.textFile()默认从hdfs中读取文件,在路径前面加上hdfs://可显式表示从hdfs中读取文件,在路径前面加上file://表示从本地文件系统读。给sc.textFile()传递的文件路径可以是如下几种情形:

  • 一个文件路径,这时候只装载指定的文件。
  • 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件)。
  • 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件。

如果想一次性读取一个目录下面的多个文件并想知道数据来自哪个文件,可以使用sc.wholeTextFiles。它会返回一个键值对RDD,其中键是输入文件的文件名。由于该函数会将一个文件作为RDD的一个元素进行读取,因此所读取的文件不能太大,以便其可以在一个机器上装得下。

同其他transform算子一样,文本读取操作也是惰性的并由action算子触发,如果发生重新计算,那么读取数据的操作也可能会被再次执行。另外,在spark中超出内存大小的文件同样是可以被处理的,因为spark并不是将数据一次性全部装入内存,而是边装入边计算。

从数据库加载数据

spark中可以使用JdbcRDD从数据库中加载数据。spark会将数据从数据库中拷贝到集群各个节点,因此使用JdbcRDD会有初始的拷贝数据的开销。也可以考虑使用sqoop将数据从数据库中迁移到hdfs中,然后从hdfs中读取数据。

将结果写入文本文件

rdd.saveAsTextFile()用于将RDD写入文本文件。spark会将传入该函数的路径参数作为目录对待,默认情况下会在对应目录输出多个文件,这取决于并行度。如果要将结果写入hdfs的一个文件中,可以这样:

1
rdd.coalesce(1).saveAsTextFile("filename")

而不要使用repartition,因为repartition会引起shuffle,而coalesce在默认情况下会避免shuffle。

关于文件系统

spark支持读写很多文件系统,包括本地文件系统、HDFS、Amazon S3等等很多。

spark在本地文件系统中读取文件时,它要求文件在集群中所有节点的相同路径下都可以找到。我们可以通过sc.addFile()来将文件弄到所有节点同路径下面,并在各计算节点中通过SparkFiles.get()来获取对应文件在该节点上的绝对路径。

sc.addFile()的输入文件路径不仅可以是本地文件系统的,还可以是HDFS等spark所支持的所有文件系统,甚至还可以是来自网络的,如HTTP、HTTPS、FTP。

关于并行

慎用可变数据

当可变数据用于并发/并行/分布式程序时,都有可能出现问题,因此对于会并发执行的代码段不要使用可变数据。

尤其要注意不要在scala的object中使用var变量!其实scala的object单例对象只是对java中静态的一种封装而已,在class文件层面,object单例对象就是用java中静态(static)来实现的,而java静态成员变量不会被序列化!在编写并行计算程序时,不要在scala的object中使用var变量,如果确实需要使用var变量,请写在class中。

另外,在分布式执行的spark代码段中使用可变的闭包变量也可能会出现不同步问题,因此请谨慎使用。

闭包 vs 广播变量

有两种方式将你的数据从driver节点发送到worker节点:通过闭包和通过广播变量。闭包是随着task的组装和分发自动进行的,而广播变量则是需要程序猿手动操作的,具体地可以通过如下方式操作广播变量(假设scSparkContext类型的对象,bcBroadcast类型的对象):

  • 可通过sc.broadcast(xxx)创建广播变量。
  • 可在各计算节点中(闭包代码中)通过bc.value来引用广播的数据。
  • bc.unpersist()可将各executor中缓存的广播变量删除,后续再使用时数据将被重新发送。
  • bc.destroy()可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。

任务闭包包含了任务所需要的代码和数据,如果一个executor数量小于RDD partition的数量,那么每个executor就会得到多个同样的任务闭包,这通常是低效的。而广播变量则只会将数据发送到每个executor一次,并且可以在多个计算操作中共享该广播变量,而且广播变量使用了类似于p2p形式的非常高效的广播算法,大大提高了效率。另外,广播变量由spark存储管理模块进行管理,并以MEMORY_AND_DISK级别进行持久化存储。

什么时候用闭包自动分发数据?情况有几种:

  • 数据比较小的时候。
  • 数据已在driver程序中可用。典型用例是常量或者配置参数。

什么时候用广播变量分发数据?情况有几种:

  • 数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
  • 数据是某种分布式计算结果。典型用例是训练模型等中间计算结果。

当数据或者变量很小的时候,我们可以在Spark程序中直接使用它们,而无需使用广播变量。

对于大的广播变量,序列化优化可以大大提高网络传输效率,参见本文序列化优化部分。

巧用累加器

累加器提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。可以通过sc.accumulator(xxx)来创建一个累加器,并在各计算节点中(闭包代码中)直接写该累加器。

累加器只能在驱动程序中被读取,对于计算节点(闭包代码)是只写的,这大大精简了累加器的设计。

使用累加器时,我们要注意的是:对于在RDD转化操作中使用的累加器,如果发生了重新计算(这可能在很多种情况下发生),那么累加器就会被重复更新,这会导致问题。而在行动操作(如foreach)中使用累加器却不会出现这种情况。因此,在转化操作中,累加器通常只用于调试目的。尽管将来版本的spark可能会改善这一问题,但在spark 1.2.0中确实存在这个问题。

关于shuffle

在经典的MapReduce中,shuffle(混洗)是连接map阶段和reduce阶段的桥梁(注意这里的术语跟spark的map和reduce操作没有直接关系),它是将各个map的输出结果重新组合作为下阶段各个reduce的输入这样的一个过程,由于这一过程涉及各个节点相互之间的数据传输,故此而名“混洗”。下面这幅图清晰地描述了MapReduce算法的整个流程,其中shuffle阶段是介于map阶段和reduce阶段之间。mapreduce过程

Spark的shuffle过程类似于经典的MapReduce,但是有所改进。spark中的shuffle在实现上主要分为shuffle writeshuffle fetch这两个大的阶段。如下图所示,shuffle过程大致可以描述为:

spark-shuffle过程

  • 首先每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
  • 其次Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中去。
  • 当Reducer启动时,它会根据自己task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

spark的shuffle实现随着spark版本的迭代正在逐步完善和成熟,这中间曾出现过多种优化实现,关于spark shuffle的演进过程和具体实现参见后面的参考链接。

shuffle(具体地是shuffle write阶段)会引起数据缓存到本地磁盘文件,从spark 1.3开始,这些缓存的shuffle文件只有在相应RDD不再被使用时才会被清除,这样在lineage重算的时候shuffle文件就不需要重新创建了,从而加快了重算效率(请注意这里的缓存并保留shuffle数据这一行为与RDD持久化和检查点机制是不同的,缓存并保留shuffle数据只是省去了重算时重建shuffle文件的开销,因此我们才有理由在shuffle(宽依赖)之后对形成的RDD进行持久化)。在standalone模式下,我们可以在spark-env.sh中通过环境变量SPARK_LOCAL_DIRS来设置shuffle数据的本地磁盘缓存目录。为了优化效率,本地shuffle缓存目录的设置都应该使用由单个逗号隔开的目录列表,并且这些目录分布在不同的磁盘上,写操作会被均衡地分配到所有提供的目录中,磁盘越多,可以提供的总吞吐量就越高。另外,SPARK_LOCAL_DIRS也是RDD持久化到磁盘的目录。

参考链接及进一步阅读:

序列化优化

在spark中,序列化通常出现在跨节点的数据传输(如广播变量、shuffle等)和数据持久化过程中。序列化和反序列化的速度、序列化之后数据大小等都影响着集群的计算效率。

spark默认使用Java序列化库,它对于除基本类型的数组以外的任何对象都比较低效。为了优化序列化效率,你可以在spark配置文件中通过spark.serializer属性来设置你想使用的序列化库,一般情况下,你可以使用这个序列化库:org.apache.spark.serializer.KryoSerializer

为了获得最佳性能,你还应该向Kryo注册你想要序列化的类,注册类可以让Kryo避免把每个对象的完整类名写下来,成千上万条记录累计节省的空间相当可观。如果你想强制要求这种注册,可以把spark.kryo.registrationRequired设置为true,这样Kryo会在遇到未注册的类时抛出错误。使用Kryo序列化库并注册所需类的示例如下:

1
2
3
4
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

Spark调度

应用调度

应用是指用户提交的spark应用程序。spark应用程序之间的调度关系,不一定由spark所管理。

在YARN和Mesos模式下,底层资源的调度策略由YARN和Mesos集群资源管理器所决定。

只有在standalone模式下,spark master按照当前集群资源是否满足等待列表中的spark应用对资源的需求,而决定是否创建一个SparkContext对应的driver,进而完成spark应用的启动过程,这个过程可以粗略地认为是一种粗颗粒度的有条件的FIFO(先进先出)调度策略。

作业调度

作业是指spark应用程序内部的由action算子触发并提交的Job。在给定的spark应用中,不同线程的多个job可以并发执行,并且这个调度是线程安全的,这使得一个spark应用可以处理多个请求。

默认地,spark作业调度是FIFO的,在多线程的情况下,某些线程提交的job可能被大大推迟执行。

不过我们可以通过配置FAIR(公平)调度器来使spark在作业之间轮询调度,这样所有的作业都能得到一个大致公平的共享的集群资源。这就意味着即使有一个很长的作业在运行,较短的作业在提交之后也能够得到不错的响应。要启用一个FAIR作业调度,需在创建SparkContext之前配置一下spark.scheduler.modeFAIR

1
2
// 假设conf是你的SparkConf变量
conf.set("spark.scheduler.mode", "FAIR")

公平调度还支持在池中将工作分组(这样就形成两级调度池),而不同的池可以设置不同的调度选项(如权重)。这种方式允许更重要的job配置在高优先级池中优先调度。如果没有设置,新提交的job将进入默认池中,我们可以通过在对应线程中给SparkContext设置本地属性spark.scheduler.pool来设置该线程对应的pool:

1
2
// 假设sc是你的SparkContext变量
sc.setLocalProperty("spark.scheduler.pool", "pool1")

在设置了本地属性之后,所有在这个线程中提交的job都将会使用这个调度池的名字。如果你想清除该线程相关的pool,只需调用如下代码:

1
sc.setLocalProperty("spark.scheduler.pool", null)

在默认情况下,每个调度池拥有相同的优先级来共享整个应用所分得的集群资源。同样的,默认池中的每个job也拥有同样的调度优先级,但是在用户创建的每个池中,job是通过FIFO方式进行调度的

关于公平调度池的详细配置,请参见官方文档:Spark Job Scheduling

如果你想阅读相关实现代码,可以观看Schedulable.scalaSchedulingAlgorithm.scala以及SchedulableBuilder.scala等相关文件。

参考链接及进一步阅读:

容错机制与检查点

spark容错机制是粗粒度并且是轻量级的,主要依赖于RDD的依赖链(lineage)。spark能够通过lineage获取足够的信息来重新计算和恢复丢失的数据分区。这样的基于lineage的容错机制可以理解为粗粒度的重做日志(redo log)。

鉴于spark的基于lineage的容错机制,RDD DAG中宽窄依赖的划分对容错也有很重要的作用。如果一个节点宕机了,而且运算是窄依赖,那只要把丢失的父RDD分区重算即可,跟其他节点没有依赖。而宽依赖需要父RDD的所有分区都存在,重算代价就很高了。可以这样理解为什么窄依赖开销小而宽依赖开销大:在窄依赖中,在子RDD分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算;而在宽依赖中,丢失一个子RDD分区将导致其每个父RDD的多个甚至所有分区的重算,而重算的结果并不都是给当前丢失的子RDD分区用的,这样就存在了冗余计算。

不过我们可以通过检查点(checkpoint)机制解决上述问题,通过在RDD上做检查点可以将物理RDD数据存储到持久层(HDFS、S3等)中。在RDD上做检查点的方法是在调用action算子之前调用checkpoint(),并且RDD最好是缓存在内存中的,否则可能导致重算(参见API注释)。示例如下:

1
2
3
4
// 假设rdd是你的RDD变量
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd.checkpoint()
val count = rdd.count()

在RDD上做检查点会切断RDD依赖,具体地spark会清空该RDD的父RDD依赖列表。并且由于检查点机制是将RDD存储在外部存储系统上,所以它可以被其他应用重用。

过长的lineage(如在pagerank、spark streaming等中)也将导致过大的重算代价,而且还会占用很多系统资源。因此,在遇到宽依赖或者lineage足够长时,我们都应该考虑做检查点

集群监控与运行日志

spark在应用执行时记录详细的进度信息和性能指标。这些内容可以在两个地方找到:spark的网页用户界面以及driver进程和executor进程生成的日志文件中。

网页用户界面

在浏览器中打开 http://master:8080 页面,你可以看到集群概况,包括:集群节点、可用的和已用的资源、已运行的和正在运行的应用等。

http://master:4040 页面用来监控正在运行的应用(默认端口为4040,如果有多个应用在运行,那么端口顺延,如4041、4042),包括其执行进度、构成Job的Stage的执行情况、Stage详情、已缓存RDD的信息、各executor的信息、spark配置项以及应用依赖信息等,该页面经常用来发现应用的效率瓶颈并辅助优化,不过该页面只有在有spark应用运行时才可以被访问到。

上述404x端口可用于查看正在运行的应用的执行详情,但是应用运行结束之后该页面就不可以访问了。要想查看已经执行结束的应用的执行详情,则需开启事件日志机制,具体地设置如下两个选项:

  • spark.eventLog.enabled: 设置为true时开启事件日志机制。这样已完成的spark作业就可以通过历史服务器查看。

  • spark.eventLog.dir: 开启事件日志机制时的事件日志文件存储位置。如果要在历史服务器中查看事件日志,需要将该值设置为一个全局可见的文件系统路径,比如HDFS中。最后,请确保目录以 ‘/‘ 结束,否则可能会出现如下错误:

    1
    2
    Application history not found ... No event logs found for application ...
    Did you specify the correct logging directory?

在配置好上述选项之后,我们就可以查看新提交的应用的详细执行信息了。在不同的部署模式中,查看的方式不同。在standalone模式中,可以直接在master节点的UI界面(上述8080端口对应的页面)中直接单击已完成应用以查看详细执行信息。在YARN/Mesos模式中,就要开启历史服务器了,此处略去。

Metrics系统

spark在其内部拥有一个可配置的度量系统(Metrics),它能够将spark的内部状态通过HTTP、JMX、CSV等多种不同形式呈现给用户。同时,用户也可以定义自己的数据源(Metrics Source)和数据输出方式(Metrics Sink),从而获取自己所需的数据。此处略去详情,可参考下面的链接进一步阅读。

参考链接及进一步阅读:

查看日志文件

spark日志文件的具体位置取决于具体的部署模式。在standalone模式中,日志默认存储于各个工作节点的spark目录下的work目录中,此时所有日志还可以直接通过主节点的网页用户界面进行查看。

默认情况下,spark输出的日志包含的信息量比较合适。我们可以自定义日志行为,改变日志等级或存储位置。spark日志系统使用log4j实现,我们只需将conf目录下的log4j.properties.template复制一个并命名为log4j.properties,然后自定义修改即可。

SparkConf与配置

spark中最主要的配置机制是通过SparkConf类对spark进行配置。当创建出一个SparkContext时,就需要创建出一个SparkConf的实例作为参数。

SparkConf实例包含用户要重载的配置选项的键值对,spark中的每个配置选项都是基于字符串形式的键值对。你可以调用SparkConf的set()或者setXxx()来设置对应选项。

另外,spark-submit脚本可以动态设置配置项。当应用被spark-submit脚本启动时,脚本会把这些配置项设置到运行环境中。当一个新的SparkConf被创建出来时,这些环境变量会被检测出来并且自动配到SparkConf中。这样在使用spark-submit时,用户应用通常只需创建一个“空”的SparkConf,并直接传递给SparkContext的构造方法即可。

spark-submit为常用的spark配置选项提供了专用的标记,还有一个通用标记--conf来接收任意spark配置项的值,形如--conf 属性名=属性值

spark-submit也支持从文件中读取配置项的值。默认情况下,spark-submit会在spark安装目录中找到conf/spark-defaults.conf文件,读取该文件中以空格隔开的键值对数据。你也可以通过spark-submit的--properties-File选项来自定义该文件的路径。

spark-defaults.conf的作用范围要搞清楚,编辑driver所在机器上的spark-defaults.conf,该文件会影响到driver所提交运行的application,及专门为该application提供计算资源的executor的启动参数。

spark有特定的优先级顺序来选择实际配置。优先级最高的是在用户代码中显式调用set()方法设置的选项。其次是通过spark-submit传递的参数。再次是写在配置文件中的值。最后是系统默认值。如果你想知道应用中实际生效的配置,可以在应用的网页用户界面中查看。

下面列出一些常用的配置项,完整的配置项列表可以参见官方配置文档

选项 默认值 描述
spark.master (none) 表示要连接的集群管理器。
spark.app.name (none) 应用名,将出现在UI和日志中。
spark.driver.memory 1g 为driver进程分配的内存。注意:在客户端模式中,不能在SparkConf中直接配置该项,因为driver JVM进程已经启动了。
spark.executor.memory 1g 为每个executor进程分配的内存。
spark.executor.cores all/1 每个executor可用的核心数。针对standalone和YARN模式。更多参见官方文档。
spark.cores.max (not set) 设置standalone和Mesos模式下应用程序的核心数上限。
spark.speculation false 设置为true时开启任务预测执行机制。当出现比较慢的任务时,这种机制会在另外的节点上也尝试执行该任务的一个副本。打开此选项会帮助减少大规模集群中个别较慢的任务带来的影响。
spark.driver.extraJavaOptions (none) 设置driver节点的JVM启动参数。
spark.executor.extraJavaOptions (none) 设置executor节点的JVM启动参数。
spark.serializer JavaSerializer 指定用来进行序列化的类库,包括通过网络传输数据或缓存数据时的序列化。为了速度,推荐使用KryoSerializer。
spark.eventLog.enabled false 设置为true时开启事件日志机制。这样已完成的spark作业就可以通过历史服务器查看。
spark.eventLog.dir file:///tmp/spark-events 开启事件日志机制时的事件日志文件存储位置。如果要在历史服务器中查看事件日志,需要将该值设置为一个全局可见的文件系统路径,比如HDFS中。最后,请确保目录以 ‘/‘ 结束,否则可能会出现错误,参见本文集群监控部分。

一些问题的解决办法

/tmp目录写满

由于Spark在计算的时候会将中间结果存储到/tmp目录,而目前linux又都支持tmpfs,其实说白了就是将/tmp目录挂载到内存当中。那么这里就存在一个问题,中间结果过多导致/tmp目录写满而出现如下错误:

1
No Space Left on the device

解决办法就是针对tmp目录不启用tmpfs,修改/etc/fstab。

无法创建进程

有时可能会遇到如下错误,即无法创建进程:

1
java.lang.OutOfMemory, unable to create new native thread

导致这种错误的原因比较多。有一种情况并非真的是内存不足引起的,而是由于超出了允许的最大文件句柄数或最大进程数。

排查的步骤就是查看一下允许打开的文件句柄数和最大进程数,如果数值过低,使用ulimit将其调高之后,再试试问题是否已经解决。

不可序列化

1
Task not serializable: java.io.NotSerializableException

作为RDD操作算子参数的匿名函数使用外部变量从而形成闭包。为了效率,spark并不是将所有东东都序列化以分发到各个executor。spark会先对该匿名函数进行ClosureCleaner.clean()处理(将该匿名函数涉及到的$outer中的与闭包无关的变量移除),然后将该匿名函数对象及闭包涉及到的对象序列化并包装成task分发到各个executor。

看到这里,你或许就发现了一个问题,那就是不管怎样,spark需要序列化的对象必须都可以被序列化!Task not serializable: java.io.NotSerializableException错误就是由于相应的对象不能被序列化造成的!

为了解决这个问题,首先你可以使用 -Dsun.io.serialization.extendedDebugInfo=true java选项来让jvm打印出更多的关于序列化的信息,以便了解哪些对象不可以被序列化。然后就是使这些对象对应的类可序列化,或者将这些对象定义在RDD操作算子的参数(匿名函数)中以取消闭包。

缺少winutils.exe

在windows上进行spark程序测试时,你可能会碰到如下几个问题:

1
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
1
2
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)

原因就是缺少 hadoop 的 winutils.exe 这个文件。解决方法是:下载一个(注意是32位还是64位),新建一个文件夹 D:\hadoop\bin\ 并将 winutils.exe 放入其中,并保证winutils.exe双击运行没有报*.dll缺失的错误,然后在程序中设置一下hadoop目录即可,如下:

1
System.setProperty("hadoop.home.dir", "D:\hadoop\")

gdbt分类器

[TOC]

GBTClassifier文件中包含有两个class文件:GBTClassifier 和 GBTClassificationModel

##1、GBTClassifier

class GBTClassifier 继承自 Estimator ,由此可见,GBTClassifier完成的工作是模型的评估/训练,实现样本数据到模型的过程。

class GBTClassifier 主要方法有相关参数的设置方法、一个train方法和copy方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class GBTClassifier @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Predictor[Vector, GBTClassifier, GBTClassificationModel]
with GBTClassifierParams with DefaultParamsWritable with Logging {

override def setMaxDepth(value: Int): this.type = set(maxDepth, value)
override def setMaxBins(value: Int): this.type = set(maxBins, value)
override def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value)
override def setMinInfoGain(value: Double): this.type = set(minInfoGain, value)
override def setMaxMemoryInMB(value: Int): this.type = set(maxMemoryInMB, value)
override def setCacheNodeIds(value: Boolean): this.type = set(cacheNodeIds, value)
override def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value)
override def setImpurity(value: String): this.type = {
logWarning("GBTClassifier.setImpurity should NOT be used")
this
}
override def setSubsamplingRate(value: Double): this.type = set(subsamplingRate, value)
override def setSeed(value: Long): this.type = set(seed, value)
override def setMaxIter(value: Int): this.type = set(maxIter, value)
override def setStepSize(value: Double): this.type = set(stepSize, value)
def setLossType(value: String): this.type = set(lossType, value)

override protected def train(dataset: Dataset[_]): GBTClassificationModel = {
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val oldDataset: RDD[LabeledPoint] =
dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
case Row(label: Double, features: Vector) =>
require(label == 0 || label == 1, s"GBTClassifier was given" +
s" dataset with invalid label $label. Labels must be in {0,1}; note that" +
s" GBTClassifier currently only supports binary classification.")
LabeledPoint(label, features)
}
val numFeatures = oldDataset.first().features.size
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification)

val instr = Instrumentation.create(this, oldDataset)
instr.logParams(params: _*)
instr.logNumFeatures(numFeatures)
instr.logNumClasses(2)

val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,
$(seed))
val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
instr.logSuccess(m)
m
}

@Since("1.4.1")
override def copy(extra: ParamMap): GBTClassifier = defaultCopy(extra)
}

二、GBTClassificationModel

GBTClassificationModel 继承自 PredictionModel,一个 Transformer,完成 DataFrame 到 DataFrame 的转换。

GBTClassificationModel中比较重要的方法是 transformImpl() 和 predict()。

transformImpl方法主要完成的功能是将 featuresCol数据 进行相关计算,得到 predict值,并将该值储存为新列。

需要注意的是,gdbt中 最终得到的 predict值 会是 1 或 0,无法得到 预测分数值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override protected def transformImpl(dataset: Dataset[_]): DataFrame = {
val bcastModel = dataset.sparkSession.sparkContext.broadcast(this)
val predictUDF = udf { (features: Any) =>
bcastModel.value.predict(features.asInstanceOf[Vector])
}
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
}

override protected def predict(features: Vector): Double = {
// TODO: When we add a generic Boosting class, handle transform there? SPARK-7129
// Classifies by thresholding sum of weighted tree predictions
val treePredictions = _trees.map(_.rootNode.predictImpl(features).prediction)
val prediction = blas.ddot(numTrees, treePredictions, 1, _treeWeights, 1)
if (prediction > 0.0) 1.0 else 0.0
}

文章来源:https://github.com/ColZer/DigAndBuried/blob/master/spark/mllib-pipeline.md

以下为原文:

在2014年11月,他就在Spark MLLib代码中CI了一个全新的package:”org.apache.spark.ml”, 和传统的”org.apache.spark.mllib”独立, 这个包即Spark MLLib的
Pipeline and Parameters

pipeline即机器学习流水线, 在实际的机器学习应用过程中,一个任务(Job)由很多过程(Stage)组成, 比如日志的收集,清理,到字段/属性/feature的提取, 多算法的组合而成的模型,
模型的离线训练和评估,到最后的模型在线服务和在线评估,所进行的每一个stage都可以概况为数据到数据的转换以及数据到模型的转换.

后面我们会看到,mllib pipeline中的stage也做了相应的划分.

Parameters即参数化,机器学习大部分的过程是一个参数调优的过程,一个模型应该很显示的告诉使用者,模型有那些参数可以进行设置,以及这些参数的默认值是怎么样的;在现有的机器学习算法中,
模型的参数可能是以模型类字段或通过一些函数进行设置,不是很直观的展现当前模型有哪些可以调优的参数;

针对这个问题,在这个版本中,提出了Parameters功能,通过trait的方式显式地指定stage拥有那些参数.

下面我们就针对这两个部分进行分析, pipeline到我今天学习为止,还只有三个PR,很多东西应该还在实验中,这里做一个分析仅供学习和备忘.

##一、Parameters

1
2
3
4
5
6
7
8
class LogisticRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

private var threshold: Option[Double] = Some(0.5)
//...
}

上面是传统的org.apache.spark.mllib包中一个分类器:LogisticRegressionModel, 如果理解logistics分类器,那么我知道其中的threshold为模型一个很重要的参数.
但是从对于一般的用户来说,我们只知道这个模型类中有一个threshold字段,并不能很清楚了该字段是否是模型可调优的参数,还是只是类的一个”全局变量”而已;

针对这个问题, 就有了Parameters参数化的概念,先直接看结果:

1
2
3
4
5
6
7
8
9
class LogisticRegressionModel private[ml] (
override val parent: LogisticRegression,
override val fittingParamMap: ParamMap,
weights: Vector)
extends Model[LogisticRegressionModel] with LogisticRegressionParams {

private[classification] trait LogisticRegressionParams extends Params
with HasRegParam with HasMaxIter with HasLabelCol with HasThreshold with HasFeaturesCol
with HasScoreCol with HasPredictionCol {

我们看到这里的LogisticRegressionModel实现了LogisticRegressionParams,而LogisticRegressionParams继承了Params类,并且”拥有”一组Param,即HasMaxIter, HasRegParam之类.
相比传统的LogisticRegressionModel, 这个版本我们可以清楚的看到,该模型有RegParam, MaxIter等7个可控参数,其中包括我们上面谈到的threshold参数, 即HasThreshold.

即Parameters 将mllib中的组件参数进行标准化和可视化,下面我们继续针对Parameters进行分析.

1
2
class Param[T] (val parent: Params,val name: String,val doc: String,
val defaultValue: Option[T] = None)extends Serializable {

Param表示一个参数,从概念上来说,一个参数有下面几个属性:

  • param的类型:即上面的[T], 它表示param值是何种类型
  • param的名称:即name
  • param的描述信息,和name相比, 它可以更长更详细, 即doc
  • param的默认值, 即defaultValue

针对param的类型,ml提供了一组默认的子类, 如IntParam,FloatParam之类的.就不详细展开

另外针对Param, 提供了接口来设置Param的值

1
2
3
def w(value: T): ParamPair[T] = this -> value
def ->(value: T): ParamPair[T] = ParamPair(this, value)
case class ParamPair[T](param: Param[T], value: T)

即将Param和value封装为paramPair类, paramPair是一个case类,没有其他的方法, 仅仅封装了Param和Value对. 因此我们可以通过param->value来创建一个ParamPair, 后面我们会看到怎么对ParamPair
进行管理.

上面谈到Param其中一个参数我们没有进行描述, 即”parent: Params”. 如果站在模型, 特征提取器等组件角度来, 它们应该是一个param的容器,它们的逻辑代码可以使用自身的容器中的param.
换句话说,如果一个组件继承自Params类,那么该组件就是一个被参数化的组件.

参考上面的LogisticRegressionModel, 它继承了LogisticRegressionParams, 而LogisticRegressionParams实现了Params, 此时LogisticRegressionModel就是一个被参数化的模型. 即自身是一个param容器

现在问题来了, 一个组件继承自Params, 然而我们没有通过add等方式来将相应的param添加到这个容器里面, 而是通过”with HasRegParam”方式来进行设置的,到底是怎么实现的呢?看一个rf现

1
2
3
4
5
6
7
8
9
private[ml] trait HasRegParam extends Params {
val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter")
def getRegParam: Double = get(regParam)
}

private[ml] trait HasMaxIter extends Params {
val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations")
def getMaxIter: Int = get(maxIter)
}

我们看到每个具体的RegParam都是继承自Params, 这个继承感觉意义不大,所有这里就不纠结它的继承机制, 核心是它的val regParam: DoubleParam常量的定义,这里的常量会被编译为一个函数,
其中函数为public, 返回值为DoubleParam, 参数为空. 为什么要强调这个呢?这是规范. 一个具体的Param只有这样的实现, 它才会被组件的Params容器捕获. 怎么捕获呢? 在Params中有这样一个代码:

1
2
3
4
5
6
7
8
9
10
// 使用 java 反射的方法列出所有的(没有入参和返回值为Param)的方法
def params: Array[Param[_]] = {
val methods = this.getClass.getMethods
methods.filter { m =>
Modifier.isPublic(m.getModifiers) &&
classOf[Param[_]].isAssignableFrom(m.getReturnType) &&
m.getParameterTypes.isEmpty
}.sortBy(_.getName)
.map(m => m.invoke(this).asInstanceOf[Param[_]])
}

这里就清晰了,一个组件, 继承Params类, 成为一个可参数化的组件, 该组件就有一个params方法可以返回该组件所有配置信息,即 Array[Param[_]]. 因为我们组件使用With方式继承了符合上面规范的”常量定义”,
这些param会被这里的def params所捕获, 从而返回该组件所有的Params;

不过还有一个问题,我们一直在说, 组件继承Params类, 成为一个可参数化的组件,但是我这里def params只是返回 Array[Param[_]], 而一个属性应该有Param和value组成, 即上面我们谈到ParamPair,
因此Params类中应该还有一个容器,维护Array[ParamPair], 或者还有一个Map,维护param->value对.

没错,这就是ParamMap的功能, 它维护了维护param->value对.并且每个实现Params的组件都有一个paramMap字段. 如下:

1
2
3
trait Params extends Identifiable with Serializable {
protected val paramMap: ParamMap = ParamMap.empty
}

具体的ParamMap实现这里就不分析, 就是一个Map的封装, 这里我们总结一下Parameters的功能:

Parameters即组件的可参数化, 一个组件,可以是模型,可以是特征选择器, 如果继承自Params, 那么它就是被参数化的组件, 将具体参数按照HasMaxIter类的规范进行定义, 然后通过With的
方式追加到组件中,从而表示该组件有相应的参数, 并通过调用Params中的getParam,set等方法来操作相应的param.

整体来说Parameters的功能就是组件参数标准化

二、Pipeline

模型训练是整个数据挖掘和机器学习的目标, 如果把整个过程看着为一个黑盒, 内部可以包含了很多很多的特征提取, 模型训练等子stage,
但是站在黑盒的外面, 整个黑盒的输出就是一个模型(Model), 我们目标就是训练出一个完美的模型, 然后再利于该模型去做服务.

这句话就是pipeline的核心, 首先pipeline是一个黑盒生成的过程, 它对外提供fit接口, 完成黑盒的训练, 生成一个黑盒模型, 即PipelineModel

如果站在白盒的角度来看, pipeline的黑盒中肯定维护了一组stage, 这些stage可以是原子的stage,也可能继续是一个黑盒模型, 在fit接口调用时候,
会按照流水线的顺序依次调用每个stage的fit/transform函数,最后输出PipelineModel.

2.1 PipelineStage

如上所言, mllib pipeline是基于Spark SQL中的schemeRDD来实现, pipeline中每一次处理操作都表现为对schemeRDD的scheme或数据进行处理, 这里的操作步骤被抽象为Stage, 即PipelineStage

1
2
3
abstract class PipelineStage extends Serializable with Logging {
private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType
}

抽象的PipelineStage的实现很简单, 只提供了transformSchema虚函数, 由具体的stage进行实现,从而在一定参数paramMap作用下,对scheme进行修改(transform).

上面我们也谈到, 具体的stage是在PipelineStage基础上划分为两大类, 即数据到数据的转换(transform)以及数据到模型的转换(fit).

  • Transformer: 数据到数据的转换
  • Estimator: 数据到模型的转换

2.1.1 Transformer

我们首先来看Transformer, 数据预处理, 特征选择与提取都表现为Transformer, 它对提供的SchemaRDD进行转换生成新的SchemaRDD, 如下所示:

1
2
3
abstract class Transformer extends PipelineStage with Params {
def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD
}

特殊的Transformer:模型

在mllib中, 有一种特殊的Transformer, 即模型(Model), 下面我们会看到模型是Estimator stage的产物,但是model本身是一个Transformer,
模型是经过训练和挖掘以后的一个对象, 它的一个主要功能就是预测/推荐服务, 即它可以对传入的dataset:SchemaRDD进行预测, 填充dataset中残缺的决策字段或评分字段, 返回更新后的SchemaRDD

2.1.2 Estimator

Estimator stage的功能是模型的估计/训练, 即它是一个SchemaRDD到Model的fit过程. 如下所示fit接口.

1
2
3
abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
def fit(dataset: SchemaRDD, paramMap: ParamMap): M
}

2.2 PipelineModel

PipelineModel是由一组Transformer组成, 在对dataset进行预测(transform)时, 是按照Transformer的有序性(Array)逐步的对dataset进行处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
class PipelineModel private[ml] (
override val parent: Pipeline,
override val fittingParamMap: ParamMap,
private[ml] val stages: Array[Transformer])
extends Model[PipelineModel] with Logging {

override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
// Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
val map = (fittingParamMap ++ this.paramMap) ++ paramMap
transformSchema(dataset.schema, map, logging = true)
stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, map))
}
}

2.3 Pipeline

Pipeline首先是一个Estimator, fit输出的模型为 PipelineModel, 其次Pipeline也继承Params类, 即被参数化,
其中有一个参数, 即stages, 它的值为Array[PipelineStage], 该参数存储了Pipeline拥有的所有的stage;

Pipeline提供了fittransformSchema两个接口:

  • transformSchema 接口: 使用foldLeft函数, 对schema进行转换.
  • fit 接口: 遍历 stages列表。将 Estimator 类型的stage 转换为 model(model其实就是一种Transformer)
class Pipeline extends Estimator[PipelineModel] {
 val stages: Param[Array[PipelineStage]] = new Param(this, "stages", "stages of the pipeline")
 def setStages(value: Array[PipelineStage]): this.type = { set(stages, value); this }
 def getStages: Array[PipelineStage] = get(stages)

 override def fit(dataset: SchemaRDD, paramMap: ParamMap): PipelineModel = {
     transformSchema(dataset.schema, paramMap, logging = true)
     val map = this.paramMap ++ paramMap
     val theStages = map(stages)
     // 记录最后一个 Estimator 的位置
     var indexOfLastEstimator = -1
     theStages.view.zipWithIndex.foreach { case (stage, index) =>
       stage match {
         case _: Estimator[_] =>
           indexOfLastEstimator = index
         case _ =>
       }
     }
     var curDataset = dataset
     val transformers = ListBuffer.empty[Transformer]
     //遍历所有的stages
     theStages.view.zipWithIndex.foreach { case (stage, index) =>
       // 遍历在 indexOfLastEstimator 之前的stage
       if (index <= indexOfLastEstimator) {
         val transformer = stage match {
           // 若 stage 为 Estimator,则将 estimator.fit的结果返回 
           case estimator: Estimator[_] =>
             estimator.fit(curDataset, paramMap)
           // 若 stage 为 Transformer,直接返回
           case t: Transformer =>
             t
           case _ =>
             throw new IllegalArgumentException(
               s"Do not support stage $stage of type ${stage.getClass}")
         }
         curDataset = transformer.transform(curDataset, paramMap)
         // 将返回的 transformer 加入 transformers列表
         transformers += transformer
       } else {
         //直接将 stage 转为 Transformer
         transformers += stage.asInstanceOf[Transformer]
       }
     }

     new PipelineModel(this, map, transformers.toArray)
   }

 private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
     val map = this.paramMap ++ paramMap
     val theStages = map(stages)
     require(theStages.toSet.size == theStages.size,
            "Cannot have duplicate components in a pipeline.")
     theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur, paramMap))
        }
}

2.3 实例讲解

Transformer1 ---> Estimator1 --> Transformer2 --> Transformer3 --> Estimator2 --> Transformer4

我们知道Estimator和Transformer的区别是, Estimator需要经过一次fit操作, 才会输出一个Transformer, 而Transformer直接就是Transformer;

对于训练过程中的Transformer,只有一个数据经过Transformer操作后会被后面的Estimator拿来做fit操作的前提下,该Transformer操作才是有意义的,否则训练数据不应该经过该Transformer.

拿上面的Transformer4来说, 它后面没有Estimator操作, 如果对训练数据进行Transformer操作没有任何意义,但是在预测数据上还是有作用的,所以它可以直接用来构建PipelineModel.

对于训练过程中Estimator(非最后一个Estimator), 即上面的Estimator1,非Estimator2, 它训练数据上训练出的模型以后, 需要利用该模型对训练数据进行transform操作,输出的数据
继续进行后面Estimator和Transformer操作.

拿缺失值填充的例子来说, 我们可以利用当前数据,训练出一个模型, 该模型计算出每个字段的平均值, 然后我们理解利用这个模型对训练数据进行缺失值填充.

但是对于最后一个Estimator,其实是没有必要做这个”curDataset = transformer.transform(curDataset, paramMap)”操作的, 所以这里还是可以有优化的!!嘿嘿!!!

##三、总结

到目前为止,已经将Pipeline讲解的比较清楚了, 利用Pipeline可以将数据挖掘中各个步骤进行流水线化, api方便还是很简介清晰的!

最后拿孟祥瑞CI的描述信息中一个例子做结尾,其中pipeline包含两个stage,顺序为StandardScaler和LogisticRegression

val scaler = new StandardScaler() 
    .setInputCol("features") 
    .setOutputCol("scaledFeatures") 
val lr = new LogisticRegression() 
    .setFeaturesCol(scaler.getOutputCol) 

val pipeline = new Pipeline() 
    .setStages(Array(scaler, lr)) 
val model = pipeline.fit(dataset) 
val predictions = model.transform(dataset) 
    .select('label, 'score, 'prediction)

Date and Time Functions

[TOC]

Name Description
[current_date] Gives current date as a date column
[current_timestamp]
[date_format]
[to_date] Converts column to date type (with an optional date format)
[to_timestamp] Converts column to timestamp type (with an optional timestamp format)
[unix_timestamp] Converts current or specified time to Unix timestamp (in seconds)
[window] Generates time windows (i.e. tumbling, sliding and delayed windows)

current_date - Current Date As Date Column

1
current_date(): Column

current_date function gives the current date as a [date] column.

1
2
3
4
5
6
7
8
9
10
11
val df = spark.range(1).select(current_date)
scala> df.show
+--------------+
|current_date()|
+--------------+
| 2017-09-16|
+--------------+

scala> df.printSchema
root
|-- current_date(): date (nullable = false)

Internally, current_date creates a Column with CurrentDate Catalyst leaf expression.

1
2
3
4
5
6
7
8
val c = current_date()
import org.apache.spark.sql.catalyst.expressions.CurrentDate
val cd = c.expr.asInstanceOf[CurrentDate]
scala> println(cd.prettyName)
current_date

scala> println(cd.numberedTreeString)
00 current_date(None)

date_format

1
date_format(dateExpr: Column, format: String): Column

Internally, date_format creates a Column with DateFormatClass binary expression. DateFormatClass takes the expression from dateExpr column and format.

1
2
3
4
5
6
7
8
9
10
11
val c = date_format($"date", "dd/MM/yyyy")

import org.apache.spark.sql.catalyst.expressions.DateFormatClass
val dfc = c.expr.asInstanceOf[DateFormatClass]
scala> println(dfc.prettyName)
date_format

scala> println(dfc.numberedTreeString)
00 date_format('date, dd/MM/yyyy, None)
01 :- 'date
02 +- dd/MM/yyyy

current_timestamp

1
current_timestamp(): Column

Note : current_timestamp is also now function in SQL.

unix_timestamp - Converting Current or Specified Time to Unix Timestamp

1
2
3
unix_timestamp(): Column  (1)
unix_timestamp(time: Column): Column (2)
unix_timestamp(time: Column, format: String): Column
  1. Gives current timestamp (in seconds)
  2. Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds)

unix_timestamp converts the current or specified time in the specified format to a Unix timestamp (in seconds).

unix_timestamp supports a column of type Date, Timestamp or String.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// no time and format => current time
scala> spark.range(1).select(unix_timestamp as "current_timestamp").show
+-----------------+
|current_timestamp|
+-----------------+
| 1493362850|
+-----------------+

// no format so yyyy-MM-dd HH:mm:ss assumed
scala> Seq("2017-01-01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017-01-01 00:00:00| 1483225200|
+-------------------+--------------+

scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time", "yyyy/MM/dd")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00| 1483225200|
+-------------------+--------------+

unix_timestamp returns null if conversion fails.

1
2
3
4
5
6
7
// note slashes as date separators
scala> Seq("2017/01/01 00:00:00").toDF("time").withColumn("unix_timestamp", unix_timestamp($"time")).show
+-------------------+--------------+
| time|unix_timestamp|
+-------------------+--------------+
|2017/01/01 00:00:00| null|
+-------------------+--------------+

Note: unix_timestamp is also supported in [SQL mode]

1
2
3
scala> spark.sql("SELECT unix_timestamp() as unix_timestamp").show 
unix_timestamp
1493369225

Internally, unix_timestamp creates a Column with UnixTimestamp binary expression (possibly with CurrentTimestamp).

window - Generating Time Windows 

1
2
3
4
5
6
7
8
9
10
11
12
window(
timeColumn: Column,
windowDuration: String): Column (1)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String): Column (2)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column (3)
  1. Creates a tumbling time window with slideDuration as windowDuration and 0 second for startTime
  2. Creates a sliding time window with 0 second for startTime
  3. Creates a delayed time window

window generates tumbling, sliding or delayed time windows of windowDuration duration given a timeColumn timestamp specifying column.

Note: From Tumbling Window (Azure Stream Analytics):Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. |

Note: From Introducing Stream Windows in Apache Flink:Tumbling windows group elements of a stream into finite sets where each set corresponds to an interval.Tumbling windows discretize a stream into non-overlapping windows.

1
2
scala> val timeColumn = window('time, "5 seconds")
timeColumn: org.apache.spark.sql.Column = timewindow(time, 5000000, 5000000, 0) AS `window`

timeColumn should be of TimestampType, i.e. with java.sql.Timestamp values.

Tip Use java.sql.Timestamp.from or java.sql.Timestamp.valueOf factory methods to create Timestamp instances.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
// (year, month, dayOfMonth, hour, minute, second)
((2012, 12, 12, 12, 12, 12), 5),
((2012, 12, 12, 12, 12, 14), 9),
((2012, 12, 12, 13, 13, 14), 4),
((2016, 8, 13, 0, 0, 0), 10),
((2017, 5, 27, 0, 0, 0), 15)).
map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
toDF("time", "level")
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+

val q = levels.select(window($"time", "5 seconds"), $"level")
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 |
+---------------------------------------------+-----+

scala> q.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- level: integer (nullable = false)

// calculating the sum of levels every 5 seconds
val sums = levels.
groupBy(window($"time", "5 seconds")).
agg(sum("level") as "level_sum").
select("window.start", "window.end", "level_sum")
scala> sums.show
+-------------------+-------------------+---------+
| start| end|level_sum|
+-------------------+-------------------+---------+
|2012-12-12 13:13:10|2012-12-12 13:13:15| 4|
|2012-12-12 12:12:10|2012-12-12 12:12:15| 14|
|2016-08-13 00:00:00|2016-08-13 00:00:05| 10|
|2017-05-27 00:00:00|2017-05-27 00:00:05| 15|
+-------------------+-------------------+---------+

windowDuration and slideDuration are strings specifying the width of the window for duration and sliding identifiers, respectively.

Use CalendarInterval for valid window identifiers.

window is available as of Spark 2.0.0.

Internally, window creates a Column (with TimeWindow expression) available as window alias.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// q is the query defined earlier
scala> q.show(truncate = false)
+---------------------------------------------+-----+
|window |level|
+---------------------------------------------+-----+
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|5 |
|[2012-12-12 12:12:10.0,2012-12-12 12:12:15.0]|9 |
|[2012-12-12 13:13:10.0,2012-12-12 13:13:15.0]|4 |
|[2016-08-13 00:00:00.0,2016-08-13 00:00:05.0]|10 |
|[2017-05-27 00:00:00.0,2017-05-27 00:00:05.0]|15 |
+---------------------------------------------+-----+

scala> println(timeColumn.expr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#22
01 +- timewindow('time, 5000000, 5000000, 0)
02 +- 'time

Example — Traffic Sensor

The example is borrowed from Introducing Stream Windows in Apache Flink.

The example shows how to use window function to model a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location.

to_date — Converting Column To DateType 

1
2
to_date(e: Column): Column
to_date(e: Column, fmt: String): Column

to_date converts the column into DateType (by casting to DateType).

fmt follows the formatting styles.

Internally, to_date creates a Column with ParseToDate expression (and Literal expression for fmt).

Use ParseToDate expression to use a column for the values of fmt.

to_timestamp — Converting Column To TimestampType 

1
2
to_timestamp(s: Column): Column
to_timestamp(s: Column, fmt: String): Column

to_timestamp converts the column into TimestampType (by casting to TimestampType).

fmt follows the formatting styles.

Internally, to_timestamp creates a Column with ParseToTimestamp expression (and Literalexpression for fmt).

Use ParseToTimestamp expression to use a column for the values of fmt.

Spark性能优化指南——基础篇(转)

来源: Spark性能优化指南——基础篇

前言

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。

然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。

笔者根据之前的Spark作业开发经验以及实践积累,总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同学,主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。

本文作为Spark性能优化指南的基础篇,主要讲解开发调优以及资源调优。

开发调优

调优概述

Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

原则一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。

// 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。
// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 错误的做法。

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正确的做法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

原则三:对多次使用的RDD进行持久化

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

对多次使用的RDD进行持久化的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

Spark的持久化级别

持久化级别 含义解释
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。

如何选择一种最合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

groupByKey实现wordcount原理

reduceByKey实现wordcount原理

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。

使用reduceByKey/aggregateByKey替代groupByKey

详情见“原则五:使用map-side预聚合的shuffle操作”。

使用mapPartitions替代普通map

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

使用foreachPartitions替代foreach

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

广播大变量的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原则八:使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

1
2
3
4
5
6
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

资源调优

调优概述

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

Spark作业基本运行原理

Spark基本运行原理

详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

资源参数调优

了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/31/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为24个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/31/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为5001000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的23倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \

写在最后的话

根据实践经验来看,大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后,一般都能以较高的性能运行了,足以满足我们的需求。但是在不同的生产环境和项目背景下,可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。为了应对这些挑战,需要使用更高级的技巧来处理这类问题。在后续的《Spark性能优化指南——高级篇》中,我们会详细讲解数据倾斜调优以及Shuffle调优。

数据倾斜(转)

[TOC]

摘要

本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等。

一、为何要处理数据倾斜(Data Skew)

1.1 什么是数据倾斜

何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟,当机器数量增加到三时,理想的耗时为120 / 3 = 40分钟,如下图所示。

1
2
3
4
5
6
7
单机处理    ============================>

分布处理 =========>
分布处理 =========>
分布处理 =========>
-----------------------------
处理时间

但是,上述情况只是理想情况,实际上将单机任务转换成分布式任务后,会有overhead,使得总的任务量较之单机时有所增加,所以每台机器的执行时间加起来比单台机器时更大。这里暂不考虑这些overhead,假设单机任务转换成分布式任务后,总任务量不变。
但即使如此,想做到分布式情况下每台机器执行时间是单机时的1 / N,就必须保证每台机器的任务量相等。不幸的是,很多时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理80%的任务,另外两台机器各处理10%的任务,如下图所示。

1
2
3
4
5
6
7
单机处理    ============================>

分布处理 =======================>
分布处理 ====>
分布处理 ====>
-----------------------------
处理时间

在上图中,机器数据增加为三倍,但执行时间只降为原来的80%,远低于理想值。   

1.2 数据倾斜的危害

当出现数据倾斜时,个别任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。
另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。   

1.3 数据倾斜是如何造成的

在Spark中,同一个Stage的不同Partition可以并行处理,而具有依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage,且Stage 1依赖于Stage 0,那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task,这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1分钟,那该Stage的总时间至少为1分钟。换句话说,一个Stage所耗费的时间,主要由最慢的那个Task决定。

由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定。

Stage的数据来源主要分为如下两类

  • 从数据源直接读取。如读取HDFS,Kafka
  • 读取上一个Stage的Shuffle数据

二、如何缓解/消除数据倾斜

2.1 避免数据源的数据倾斜 — 读Kafka

以Spark Stream通过DirectStream方式读取Kafka数据为例。由于Kafka的每一个Partition对应Spark的一个Task(Partition),所以Kafka内相关Topic的各Partition之间数据是否平衡,直接决定Spark处理该数据时是否会产生数据倾斜。

如《Kafka设计解析(一)- Kafka背景及架构介绍》一文所述,Kafka某一Topic内消息在不同Partition之间的分布,主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner,则每条消息会随机发送到一个Partition中,从而从概率上来讲,各Partition间的数据会达到平衡。此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜。

但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据放于同一个Partition中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个Partition中。此时,如果产生了数据倾斜,则需要通过其它方式处理。

2.2 避免数据源的数据倾斜 — 读文件

2.2.1 原理

Spark以通过textFile(path, minPartitions)方法读取文件时,使用TextFileFormat。

对于不可切分的文件,每个文件对应一个Split从而对应一个Partition。此时各文件大小是否一致,很大程度上决定了是否存在数据源侧的数据倾斜。另外,对于不可切分的压缩文件,即使压缩后的文件大小一致,它所包含的实际数据量也可能差别很多,因为源文件数据重复度越高,压缩比越高。反过来,即使压缩文件大小接近,但由于压缩比可能差距很大,所需处理的数据量差距也可能很大。

此时可通过在数据生成端将不可切分文件存储为可切分文件,或者保证各文件包含数据量相同的方式避免数据倾斜。

对于可切分的文件,每个Split大小由如下算法决定。其中goalSize等于所有文件总大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小决定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size决定。

1
2
3
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

默认情况下各Split的大小不会太大,一般相当于一个Block大小(在Hadoop 2中,默认值为128MB),所以数据倾斜问题不明显。如果出现了严重的数据倾斜,可通过上述参数调整。

2.2.2 案例

现通过脚本生成一些文本文件,并通过如下代码进行简单的单词计数。为避免Shuffle,只计单词总个数,不须对单词进行分组计数。

1
2
3
4
5
6
SparkConf sparkConf = new SparkConf().setAppName("ReadFileSkewDemo");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
long count = javaSparkContext.textFile(inputFile, minPartitions)
.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count();
System.out.printf("total words : %s", count);
javaSparkContext.stop();

总共生成如下11个csv文件,其中10个大小均为271.9MB,另外一个大小为8.5GB。

uncompressedfiles

之后将8.5GB大小的文件使用gzip压缩,压缩后大小仅为25.3MB。
compressedfiles

使用如上代码对未压缩文件夹进行单词计数操作。Split大小为 max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9*10+8.5 *1024) / 1 MB, 128 MB) = 128MB。无明显数据倾斜。

splitable_unskewed

使用同样代码对包含压缩文件的文件夹进行同样的单词计数操作。未压缩文件的Split大小仍然为128MB,而压缩文件(gzip压缩)由于不可切分,且大小仅为25.3MB,因此该文件作为一个单独的Split/Partition。虽然该文件相对较小,但是它由8.5GB文件压缩而来,包含数据量是其它未压缩文件的32倍,因此处理该Split/Partition/文件的Task耗时为4.4分钟,远高于其它Task的10秒。
compressedfileskew

由于上述gzip压缩文件大小为25.3MB,小于128MB的Split大小,不能证明gzip压缩文件不可切分。现将minPartitions从默认的1设置为229,从而目标Split大小为max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB。如果gzip压缩文件可切分,则所有Split/Partition大小都不会远大于12。反之,如果仍然存在25.3MB的Partition,则说明gzip压缩文件确实不可切分,在生成不可切分文件时需要如上文所述保证各文件数量大大致相同。

如下图所示,gzip压缩文件对应的Split/Partition大小为25.3MB,其它Split大小均为12MB左右。而该Task耗时4.7分钟,远大于其它Task的4秒。
unsplitable_skew

2.2.3总结

*适用场景 *
数据源侧存在不可切分文件,且文件内包含的数据量相差较大。

*解决方案 *
尽量使用可切分的格式代替不可切分的格式,或者保证各文件实际包含数据量大致相同。

*优势 *
可撤底消除数据源侧数据倾斜,效果显著。

*劣势 *
数据源一般来源于外部系统,需要外部系统的支持。

2.3 调整并行度分散同一个Task的不同Key

2.3.1 原理

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。

如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。
changeparallelism

2.3.2 案例

现有一张测试表,名为student_external,内有10.5亿条数据,每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5亿条数据,并通过一些处理,使得id为9亿到9.4亿间的所有数据对12取模后余数为8(即在Shuffle并行度为12时该数据集全部被HashPartition分配到第8个Task),其它数据集对其id除以100取整,从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中,而id小于9.4亿的数据全部分配到同一个Task中。处理过程如下

1
2
3
4
5
6
7
INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
ELSE CAST(id/100 AS INTEGER)
END,
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好。接下来,使用Spark读取该测试数据,并通过groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SparkDataSkew {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDataSkewTunning")
.config("hive.metastore.uris", "thrift://hadoop1:9083")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> dataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD()
.mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
.groupByKey(12)
.mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
int id = tuple._1();
AtomicInteger atomicInteger = new AtomicInteger(0);
tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
return new Tuple2<Integer, Integer>(id, atomicInteger.get());
}).count();

sparkSession.stop();
sparkSession.close();
}

}

本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16,内存为16GB。使用如下方式提交上述应用,将启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置,仅用于本文实验),可用内存为12GB。

1
spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage的Task状态如下图所示,Task 8处理的记录数为4500万,远大于(9倍于)其它11个Task处理的500万记录。而Task 8所耗费的时间为38秒,远高于其它11个Task的平均时间(16秒)。整个Stage的时间也为38秒,该时间主要由最慢的Task 8决定。

differentkeyskew12

在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task(即该例中的Task 8)的不同Key分配到不同Task,从而降低Task 8所需处理的数据量,缓解数据倾斜。

通过groupByKey(48)将Shuffle并行度调整为48,重新提交到Spark。新的Job的GroupBy Stage所有Task状态如下图所示。
differentkeyskew48

从上图可知,记录数最多的Task 20处理的记录数约为1125万,相比于并行度为12时Task 8的4500万,降低了75%左右,而其耗时从原来Task 8的38秒降到了24秒。

在这种场景下,调整并行度,并不意味着一定要增加并行度,也可能是减小并行度。如果通过groupByKey(11)将Shuffle并行度调整为11,重新提交到Spark。新Job的GroupBy Stage的所有Task状态如下图所示。
differentkeyskew11

从上图可见,处理记录数最多的Task 6所处理的记录数约为1045万,耗时为23秒。处理记录数最少的Task 1处理的记录数约为545万,耗时12秒。

2.3.3总结

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。

优势
实现简单,可在需要Shuffle的操作算子上直接设置并行度或者使用spark.default.parallelism设置。如果是Spark SQL,还可通过SET spark.sql.shuffle.partitions=[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。

劣势
适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

2.4 自定义Partitioner

2.4.1 原理

使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。

2.4.2 案例

以上述数据集为例,继续将并发度设置为12,但是在groupByKey算子上,使用自定义的Partitioner(实现如下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}

@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})

由下图可见,使用自定义Partition后,耗时最长的Task 6处理约1000万条数据,用时15秒。并且各Task所处理的数据集大小相当。
customizedpartition

2.4.3 总结

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。

优势
不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。

劣势
适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。

2.5 将Reduce side Join转变为Map side Join

2.5.1 原理

通过Spark的Broadcast机制,将Reduce侧Join转化为Map侧Join,避免Shuffle从而完全消除Shuffle带来的数据倾斜。
mapjoin

2.5.2 案例

通过如下SQL创建一张具有倾斜Key且总记录数为1.5亿的大表test。

1
2
3
4
5
6
INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
ELSE CAST(id/10 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

使用如下SQL创建一张数据分布均匀且总记录数为50万的小表test_new。

1
2
3
4
5
INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

直接通过Spark Thrift Server提交如下SQL将表test与表test_new进行Join并将Join结果存于表test_join中。

1
2
3
4
5
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

该SQL对应的DAG如下图所示。从该图可见,该执行过程总共分为三个Stage,前两个用于从Hive中读取数据,同时二者进行Shuffle,通过最后一个Stage进行Join并将结果写入表test_join中。
reducejoindag

从下图可见,Join Stage各Task处理的数据倾斜严重,处理数据量最大的Task耗时7.1分钟,远高于其它无数据倾斜的Task约2秒的耗时。
reducejoinlaststage

接下来,尝试通过Broadcast实现Map侧Join。实现Map侧Join的方法,并非直接通过CACHE TABLE test_new将小表test_new进行cache。现通过如下SQL进行Join。

1
2
3
4
5
6
CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通过如下DAG图可见,该操作仍分为三个Stage,且仍然有Shuffle存在,唯一不同的是,小表的读取不再直接扫描Hive表,而是扫描内存中缓存的表。
reducejoincachedag

并且数据倾斜仍然存在。如下图所示,最慢的Task耗时为7.1分钟,远高于其它Task的约2秒。
reducejoincachelaststage

正确的使用Broadcast实现Map侧Join的方式是,通过SET spark.sql.autoBroadcastJoinThreshold=104857600;将Broadcast的阈值设置得足够大。

再次通过如下SQL进行Join。

1
2
3
4
5
6
SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通过如下DAG图可见,该方案只包含一个Stage。
mapjoindag

并且从下图可见,各Task耗时相当,无明显数据倾斜现象。并且总耗时为1.5分钟,远低于Reduce侧Join的7.3分钟。
mapjoinlaststage

2.5.3 总结

适用场景
参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

解决方案
在Java/Scala代码中将小数据集数据拉取到Driver,然后通过Broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前,将Broadcast的阈值调整得足够大,从而使用Broadcast生效。进而将Reduce侧Join替换为Map侧Join。

优势
避免了Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。

劣势
要求参与Join的一侧数据集足够小,并且主要适用于Join的场景,不适合聚合的场景,适用条件有限。

2.6 为skew的key增加随机前/后缀

2.6.1 原理

为数据量特别大的Key增加随机前/后缀,使得原来Key相同的数据变为Key不相同的数据,从而使倾斜的数据集分散到不同的Task中,彻底解决数据倾斜问题。Join另一则的数据中,与倾斜Key对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。
randomprefix

2.6.2 案例

通过如下SQL,将id为9亿到9.08亿共800万条数据的id转为9500048或者9500096,其它数据的id除以100取整。从而该数据集中,id为9500048和9500096的数据各400万,其它id对应的数据记录数均为100条。这些数据存于名为test的表中。

对于另外一张小表test_new,取出50万条数据,并将id(递增且唯一)除以100取整,使得所有id都对应100条数据。

1
2
3
4
5
6
7
8
9
10
11
12
INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
ELSE CAST(id/100 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/100 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

通过如下代码,读取test表对应的文件夹内的数据并转换为JavaPairRDD存于leftRDD中,同样读取test表对应的数据存于rightRDD中。通过RDD的join算子对leftRDD与rightRDD进行Join,并指定并行度为48。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SparkDataSkew{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
sparkConf.set("spark.default.parallelism", String.valueOf(parallelism));
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

leftRDD.join(rightRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

从下图可看出,整个Join耗时1分54秒,其中Join Stage耗时1.7分钟。
fewskewkeyjoinallstage

通过分析Join Stage的所有Task可知,在其它Task所处理记录数为192.71万的同时Task 32的处理的记录数为992.72万,故它耗时为1.7分钟,远高于其它Task的约10秒。这与上文准备数据集时,将id为9500048为9500096对应的数据量设置非常大,其它id对应的数据集非常均匀相符合。
fewskewkeyjoinlaststage

现通过如下操作,实现倾斜Key的分散处理

  • 将leftRDD中倾斜的key(即9500048与9500096)对应的数据单独过滤出来,且加上1到24的随机前缀,并将前缀与原数据用逗号分隔(以方便之后去掉前缀)形成单独的leftSkewRDD
  • 将rightRDD中倾斜key对应的数据抽取出来,并通过flatMap操作将该数据集中每条数据均转换为24条数据(每条分别加上1到24的随机前缀),形成单独的rightSkewRDD
  • 将leftSkewRDD与rightSkewRDD进行Join,并将并行度设置为48,且在Join过程中将随机前缀去掉,得到倾斜数据集的Join结果skewedJoinRDD
  • 将leftRDD中不包含倾斜Key的数据抽取出来作为单独的leftUnSkewRDD
  • 对leftUnSkewRDD与原始的rightRDD进行Join,并行度也设置为48,得到Join结果unskewedJoinRDD
  • 通过union算子将skewedJoinRDD与unskewedJoinRDD进行合并,从而得到完整的Join结果集

具体实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class SparkDataSkew{
public static void main(String[] args) {
int parallelism = 48;
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

String[] skewedKeyArray = new String[]{"9500048", "9500096"};
Set<String> skewedKeySet = new HashSet<String>();
List<String> addList = new ArrayList<String>();
for(int i = 1; i <=24; i++) {
addList.add(i + "");
}
for(String key : skewedKeyArray) {
skewedKeySet.add(key);
}

Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftSkewRDD = leftRDD
.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
.join(rightSkewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));

skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

从下图可看出,整个Join耗时58秒,其中Join Stage耗时33秒。
fewskewkeyrandomjoinallstage

通过分析Join Stage的所有Task可知

  • 由于Join分倾斜数据集Join和非倾斜数据集Join,而各Join的并行度均为48,故总的并行度为96
  • 由于提交任务时,设置的Executor个数为4,每个Executor的core数为12,故可用Core数为48,所以前48个Task同时启动(其Launch时间相同),后48个Task的启动时间各不相同(等待前面的Task结束才开始)
  • 由于倾斜Key被加上随机前缀,原本相同的Key变为不同的Key,被分散到不同的Task处理,故在所有Task中,未发现所处理数据集明显高于其它Task的情况

fewskewkeyjoinrandomlaststage

实际上,由于倾斜Key与非倾斜Key的操作完全独立,可并行进行。而本实验受限于可用总核数为48,可同时运行的总Task数为48,故而该方案只是将总耗时减少一半(效率提升一倍)。如果资源充足,可并发执行Task数增多,该方案的优势将更为明显。在实际项目中,该方案往往可提升数倍至10倍的效率。

2.6.3 总结

适用场景
两张表都比较大,无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。

解决方案
将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。

优势
相对于Map则Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。

劣势
如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。

2.7 大表随机添加N种随机前缀,小表扩大N倍

2.7.1 原理

如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。
randomprefixandenlargesmalltable

2.7.2案例

这里给出示例代码,读者可参考上文中分拆出少数倾斜Key添加随机前缀的方法,自行测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SparkDataSkew {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

List<String> addList = new ArrayList<String>();
for(int i = 1; i <=48; i++) {
addList.add(i + "");
}

Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightNewRDD = rightRDD
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> joinRDD = leftRandomRDD
.join(rightNewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

2.7.3总结

适用场景
一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。

优势
对大部分场景都适用,效果不错。

劣势
需要将一个数据集整体扩大N倍,会增加资源消耗。

三、总结

对于数据倾斜,并无一个统一的一劳永逸的方法。更多的时候,是结合数据特点(数据集大小,倾斜Key的多少等)综合使用上文所述的多种方法。

四、来源

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势