0%

启动Driver

前面介绍了从命令行提交任务开始,至Master注册Driver的流程。接下来是 Master将注册的Driver信息发送到Worker,在Worker节点启动Driver。

一、Master发送LaunchDriver消息

上一节讲到 Master(org.apache.spark.deploy.master.Master) 接收 ClientEndPoint发送的注册Driver的信息。接下来调用schedule()方法,启动Driver和Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
// 持久化Driver 用于master recovery 时恢复Driver
persistenceEngine.addDriver(driver)
// 注册 Driver
waitingDrivers += driver
drivers.add(driver)
// launch Driver 和 Executor
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
//忽略
...
}

schedule() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// 打乱Worker顺序,避免Driver过度集中
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 变量Worker如果Work节点剩余内存和core足够,启动Driver
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//发送LauncherDriver消息给Worker节点
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// 在work节点启动Executor,该内容在《spark4-启动Executor》讲解
startExecutorsOnWorkers()
}

schedule中调用launchDriver(worker, driver)方法

1
2
3
4
5
6
7
8
9
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
// 调用RpcEndPointRef,发送LauncherDriver消息给Worker节点
// 此Work是 schedule 随机选择的
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}

二、Worker节点启动Driver

Worker(org.apache.spark.deploy.worker.Worker)的receive方法接收并处理LaunchDriver信息,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive: PartialFunction[Any, Unit] = synchronized {
// 忽略
//...
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
//封装Driver信息为DriverRunner
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
//启动Driver
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}

查看DriverRunner的start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/** 启动一个线程,去运行 这个driver. */
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}

// 从其他环境中下载driver的jars 并在本地运行 driver
val exitCode = prepareAndRunDriver()

// 根据exitCode,设置 finalState
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}

// 一旦Driver运行完毕,
// work会 执行org.apache.spark.deploy.worker.Worker中的 handleDriverStateChanged()
//
// - work会向Master发送消息driverStateChanged
// - 释放相关资源
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[worker] def prepareAndRunDriver(): Int = {
// 下载Driver jar 到Worker本地,即本示例中的 /path/to/examples.jar
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
// 替换参数中的workerUrl和localJarFilename
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}

// 将Driver中的参数组织为Linux命令
// 通过java执行组织好的命令
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
// 这一步就是启动Driver,即执行 /parh/to/examples.jar中的main方法.
runDriver(builder, driverDir, driverDesc.supervise)
}

执行步骤如上图注释,最终使用Java中的java.lang.ProcessBuilder类执行Linux命令的方式启动Driver,Linux命令大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
java -cp $SPARK_ASSEMBLY_JAR \
-Xms1024M -Xmx1024M -Dakka.loglevel=WARNING \
-Dspark.executor.memory=512m \
-Dspark.driver.supervise=false \
-Dspark.submit.deployMode=cluster \
-Dspark.app.name=org.apache.spark.examples.SparkPi \
-Dspark.rpc.askTimeout=10 \
-Dspark.master=$MasterUrl -XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.DriverWrapper \
$WorkerUrl \
/path/to/example.jar \
org.apache.spark.examples.SparkPi \
1000

到这里,通过spark-submit上传的/path/to/examples.jar,通过java -cp命令在Worker节点开始运行了,即Launch Driver,所谓的Driver就是/path/to/examples.jar
最后,将Driver的执行状态返回给Master。

总结

介绍了Master将Driver发送到Worker,及在Worker节点启动Driver的流程,如下

参考

Spark源码阅读:Driver的注册与启动

spark-submit的提交过程

介绍在Standalone模式下,从命令行使用spark-submit提交任务开始,到将Driver提交到Master的过程。

一、spark-submit提交任务

下面是Spark doc中的一个命令行提交的例子,也是我们分析任务调度的起点

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark:xxxx \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-core 100 \
/path/to/examples.jar \
1000

注意,上图中的deploy-mode为cluster,spark-submit脚本的过程可参见 上一篇文章《spark0-spark_submit脚本执行逻辑》。

最终运行为:

1
2
3
4
5
6
7
8
9
10
11
12
/opt/jdk1.8/bin/java \
-Dhdp.version=2.6.0.3-8 \
-cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ \
org.apache.spark.deploy.SparkSubmit \
--class org.apache.spark.examples.SparkPi \
--master spark:xxxx \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-core 100 \
/path/to/examples.jar \
1000

二、org.apache.spark.deploy.SparkSubmit

shell中调用了 SparkSubmit 的main函数,main函数主要逻辑:
处理传入的配置信息并设置为环境变量(Driver、RestSubmissionClient或Client通过环境变量读取此配置)、使用类加载器加载配置的jar等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 解析执行下面的四个参数: 需要注意的是 childMainClass。
// childMainClass 根据spark的模式 会分为多种情况:
// - org.apache.spark.deploy.Client
// - org.apache.spark.deploy.yarn.Client
// - org.apache.spark.deploy.rest.RestSubmissionClient
// - 用户自定义的app类
//
// 在本例子中为 org.apache.spark.deploy.Client
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

//声明doRunMain()方法
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
//忽略
}
} else {
// 一般默认走这里
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}

if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
//忽略
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}

通过反射启动childMainClass:

在例子中 childMainClass 代表 org.apache.spark.deploy.Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
//...
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)

// 将jar 以 URL 的形式加入 loader
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// 环境变量
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
// 反射 主类 并启动 主类
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
//忽略
}
// 忽略

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
//忽略

// 反射启动主类
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
// 忽略
}
}

三、SparkSubmit发送消息 向 master 注册 Driver

SparkSubmit的反射子类(childMainClass)根据不同模式 会有多种类型

  • org.apache.spark.deploy.Client
  • org.apache.spark.deploy.yarn.Client
  • org.apache.spark.deploy.rest.RestSubmissionClient
  • 用户自定义的app类

这里 我们选择StandaloneCluster模式进行说明,SparkSubmit使用反射运行org.apache.spark.deploy.Client.main()
Client的main方法先处理传入的参数,然后创建RpcEnv对象。
如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object Client {
def main(args: Array[String]) {
//....

val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//创建PRC_ENV
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
// 获取与Master通信的EndPoint
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//注册ClientEndpoint
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

rpcEnv.awaitTermination()
}
}

ClientEndpoint 的 onStart()方法会 发送消息给Master,注册Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
//Spark使用**DriverWrapper**启动用户APP的main函数,而不是直接启动,
// 这是为了Driver程序和启动Driver的Worker程序共命运(源码注释中称为**share fate**).
// 即如果此Worker挂了,对应的Driver也会停止。
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}

val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}

val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
// 运行命令
// 这里的mainClass为org.apache.spark.deploy.worker.DriverWrapper
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
// 给Master发送RequestSubmitDriver消息,注册Driver
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}

如上所示Driver信息的主要包含 jarUrl、memory、cores、command和supervisor。
信息除了command外均容易理解,查看Command的内容

1
2
3
4
5
6
7
8
private[spark] case class Command(
mainClass: String, // DriverWrapper
arguments: Seq[String],
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
javaOpts: Seq[String]) {
}

至此,Client提交 Driver 流程结束了。

四、Master处理RequestSubmitDriver消息

Master(org.apache.spark.deploy.master.Master)的receiveAndReply方法接收Client发送的消息RequestSubmitDriver,将收到的Driver注册到waitingDrivers。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>

case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
// 判断 Master 的运行状态,非 ALIVE 则 报错。
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
// 持久化Driver 用于master recovery 时恢复Driver
persistenceEngine.addDriver(driver)
// 注册 Driver
waitingDrivers += driver
drivers.add(driver)
// launch Driver 和 Executor,这个下一章分析
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}

case RequestKillDriver(driverId) =>
//...

case RequestDriverStatus(driverId) =>
//...

case RequestMasterState =>
//...

case BoundPortsRequest =>
//...

case RequestExecutors(appId, requestedTotal) =>
//...

case KillExecutors(appId, executorIds) =>
//...
}

五、总结

介绍了deploy-mode=cluster模式下,从命令行提交任务,到Master端接收并注册Driver的过程,完整流程如下

spark-submit分析

Spark-submit提交实例:

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://192.168.1.20:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 2G \
--total-executor-cores 5 \
/path/to/examples.jar \
1000

spark-submit 脚本的主要流程:

  1. 执行 ./bin/spark-submit文件

    • 设置 Spark_home, 关闭字符串的随机Hash,调用 ./bin/spark-class
  2. 执行 ./bin/spark-class文件

    • 执行./bin/load-spark-env.sh: 设置 Spark_home,执行 ./conf/spark-env.sh,scala版本号和scala主目录
    • 设置 Spark_home
    • 获取 java_home 或者 确认 java指令 能够使用
    • 设置 将spark的相关jar包和scala的相关jar 设置到 LAUNCH_CLASSPATH 添加到中
    • 最终执行 java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main + 相关参数

spark-submit脚本内容

这个脚本较为简单,将参数传递给spark-class运行
$@ : 表示传给脚本的所有参数的列表

1
2
3
4
5
6
7
8
9
10
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
# 对同一个字符串,多次产生的hash值相同。
export PYTHONHASHSEED=0

# 调用 ./bin/spark-class
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-class脚本内容

分析spark-class的逻辑之前,先看看spark-env.sh的实现内容。

spark-env.sh

主要定义spark-env.sh中的变量、scala版本号和scala主目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1

# Returns the parent of the directory this script lives in.
parent_dir="${SPARK_HOME}"

user_conf_dir="${SPARK_CONF_DIR:-"$parent_dir"/conf}"

if [ -f "${user_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${user_conf_dir}/spark-env.sh"
set +a
fi
fi

# Setting SPARK_SCALA_VERSION if not already set.

if [ -z "$SPARK_SCALA_VERSION" ]; then

ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"

if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
exit 1
fi

if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
fi
fi

spark-class具体内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#定位spark主目录
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

#加载load-spark-env.sh,运行环境相关信息
#例如配置文件conf下的spark-env.sh等
. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Find assembly jar
# 定位spark-assembly-1.5.0-hadoop2.4.0.jar文件(以spark1.5.0为例)
#这意味着任务提交时无需将该JAR文件打包
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
# 判断是 spark 是否已经编译
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
# Spark assembly jars只能存在一个
if [ -d "$ASSEMBLY_DIR" ]; then
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
fi
# 获得ASSEMBLY_JAR路径
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi


# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=() # 创建数组
while IFS= read -d '' -r ARG; do # 把build_commands输出结果,循环加到数组CMD中
CMD+=("$ARG")
done < <(build_command "$@")

COUNT=${#CMD[@]} # 数组长度
LAST=$((COUNT - 1)) # 数组长度-1
LAUNCHER_EXIT_CODE=${CMD[$LAST]} # 数组的最后一个值,也就是上边$?的值

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
# 某些JVM失败会导致错误被打印到stdout(而不是stderr),这会导致解析启动程序输出的代码变得混乱。
# 在这些情况下,检查退出代码是否为整数,如果不是,将其作为特殊的错误处理。
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi

# 如果返回值不为0,退出,返回返回值
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}") # CMD还是原来那些参数,$@
exec "${CMD[@]}" # 执行这些

最终运行命令参考为:

1
2
3
4
5
6
7
8
9
/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/
org.apache.spark.deploy.SparkSubmit \
--master spark://192.168.1.20:7077 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--executor-memory 2G \
--total-executor-cores 5 \
../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar \
1000

参考

Spark源码阅读: Spark Submit任务提交

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

引文: 学习一个算法,我们最关心的并不是算法本身,而是一个算法能够干什么,能应用到什么地方。很多的时候,我们都需要从大量数据中提取出有用的信息,从大规模数据中寻找物品间的隐含关系叫做关联分析(association analysis)或者关联规则学习(association rule learning)。比如在平时的购物中,那些商品一起捆绑购买销量会比较好,又比如购物商城中的那些推荐信息,都是根据用户平时的搜索或者是购买情况来生成的。如果是蛮力搜索的话代价太高了,所以Apriori就出现了,就是为了解决这类问题的。

内容纲要

  • 关联分析
  • Apriori算法理论
  • Apriori实现
    • 频繁项集生成
    • 关联规则生成
  • reference

Apriori算法

  • 优点:易编码实现
  • 缺点:在大数据集上可能较慢
  • 适合数据类型:数值型或者标称型数据

1 关联分析

说到关联分析,顾名思义的就可以联想到,所谓关联就是两个东西之间存在的某种联系。关联分析最有名的例子是“尿布和啤酒”,以前在美国西部的一家连锁店,店家发现男人们在周四购买尿布后还会购买啤酒。于是他便得出一个推理,尿布和啤酒存在某种关联。但是具体怎么来评判呢?

那么,这里用的是支持度可信度来评判!

一个项集的支持度(support)被定义为数据集中包含该数据集的记录所占的比例。可信度或置信度(confidence)是正对一条关联规则来定义的,比如{尿布}->{啤酒},这条规则的可信度定义为“支持度{尿布,啤酒}/支持度{尿布}”

比如有规则 X=>Y,它的支持度可以计算为包含XUY所有商品的交易量相对所有交易量的比例(也就是X和Y同时出现一次交易的概率)。可信度定义为包含XUY所有物品的交易量相对仅包含X的交易量的比值,也就是说可信度对应给定X时的条件概率。关联规则挖掘,其目的是自动发起这样的规则,同时计算这些规则的质量。

计算公式如下:
$$
支持度 = \frac{交易量包含XUY}{交易量}
$$

$$
可信度 = \frac{交易量包含XUY}{交易量包含X}
$$

支持度和可信度是用来量化关联分析是否成功的方法。关联分析的目的包括两个:发现频繁项集和发现关联规则。首先我们要找到频繁项集,然后根据频繁项集找出关联规则。下面使用apriori算法来发现频繁项集。

2 Apriori理论

算法的一般过程:

  • 收集数据:使用任何方法
  • 准备数据:任意数据类型都可以,因为我们只保存集合
  • 分析数据:使用任何方法
  • 训练算法:使用Apriori算法来找到频繁项集
  • 测试算法:不需要测试过程
  • 使用算法:用于发现频繁项集以及物品之间的关联规则

使用Apriori算法,首先计算出单个元素的支持度,然后选出单个元素置信度大于我们要求的数值,比如0.5或是0.7等。然后增加单个元素组合的个数,只要组合项的支持度大于我们要求的数值就把它加到我们的频繁项集中,依次递归。

然后根据计算的支持度选出来的频繁项集来生成关联规则。

3 Apriori实现

首先定义一些算法的辅助函数
加载数据集的

1
2
3
4
5
from numpy import *

def loadDataSet():
list = [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]]
return list

根据数据集构建集合C1,该集合是大小为1的所有候选集的集合。

1
2
3
4
5
6
7
8
def createC1(dataSet):
C1 = [] #C1是大小为1的所有候选项集的集合
for transaction in dataSet:
for item in transaction:
if not [item] in C1:
C1.append([item])
C1.sort()
return map(frozenset, C1)#use frozen set so we can use it as a key in a dict

根据构建出来的频繁项集,选出满足我们需要的大于我们给定的支持度的项集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#D表示数据集,CK表示候选项集,minSupport表示最小的支持度,自己设定
def scanD(D, Ck, minSupport):
ssCnt = {}
for tid in D: # 统计 候选项出现次数
for can in Ck:
if can.issubset(tid):
if not ssCnt.has_key(can): ssCnt[can]=1
else: ssCnt[can] += 1
numItems = float(len(D))
retList = [] #存储满足最小支持度要求的项集
supportData = {} #每个项集的支持度字典
for key in ssCnt: #计算所有项集的支持度
support = ssCnt[key]/numItems
if support >= minSupport:
retList.insert(0,key)
supportData[key] = support
return retList, supportData

3.1 频繁项集

关于频繁项集的产生,我们单独的抽取出来
首先需要一个生成合并项集的函数,将两个子集合并的函数

1
2
3
4
5
6
7
8
9
10
11
#LK是频繁项集列表,K表示接下来合并的项集中的单个想的个数{1,2,3}表示k=3
def aprioriGen(Lk, k): #creates Ck
retList = []
lenLk = len(Lk)
for i in range(lenLk):
for j in range(i+1, lenLk):
L1 = list(Lk[i])[:k-2]; L2 = list(Lk[j])[:k-2] #前k-2个项相同时,将两个集合合并
L1.sort(); L2.sort()
if L1==L2: #if first k-2 elements are equal
retList.append(Lk[i] | Lk[j]) #set union
return retList

接着定义生成频繁项集的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#只需要输入数据集和支持度即可
def apriori(dataSet, minSupport = 0.5):
C1 = createC1(dataSet)
D = map(set, dataSet)
L1, supportData = scanD(D, C1, minSupport)
L = [L1]
k = 2
while (len(L[k-2]) > 0):
Ck = aprioriGen(L[k-2], k)
Lk, supK = scanD(D, Ck, minSupport)#scan DB to get Lk
supportData.update(supK)
L.append(Lk)
k += 1
return L, supportData#返回频繁项集和每个项集的支持度值

3.2 关联规则生成

通过频繁项集,我们可以得到相应的规则,但是具体规则怎么得出来的呢?下面给出一个规则生成函数,具体原理参考注释

1
2
3
4
5
6
7
8
9
10
11
#输入的参数分别为:频繁项集、支持度数据字典、自定义的最小支持度,返回的是可信度规则列表
def generateRules(L, supportData, minConf=0.7): #支持度是通过scanD得到的字典
bigRuleList = []
for i in range(1, len(L)):#只去频繁项集中元素个数大于2的子集,如{1,2}{1,2,3},不取{2}{3},etc...
for freqSet in L[i]:
H1 = [frozenset([item]) for item in freqSet]
if (i > 1):
rulesFromConseq(freqSet, H1, supportData, bigRuleList, minConf)
else:
calcConf(freqSet, H1, supportData, bigRuleList, minConf)
return bigRuleList

下面定义一个用来计算置信度的函数,通过该函数抽取出符合我们要求的规则,如freqSet为{1,2},H为{1},{2},可以计算出{1}—>{2}和{2}—>{1}的质心度,即下面的conf变量,然后用if语句判断是否符合我们的要求。代码如下:

1
2
3
4
5
6
7
8
9
10
#计算可信度,找到满足最小可信度的要求规则
def calcConf(freqSet, H, supportData, brl, minConf=0.7):
prunedH = [] #create new list to return
for conseq in H:
conf = supportData[freqSet]/supportData[freqSet-conseq] #calc confidence
if conf >= minConf:
print freqSet-conseq,'-->',conseq,'conf:',conf
brl.append((freqSet-conseq, conseq, conf))
prunedH.append(conseq)
return prunedH

下面的函数是用来合并子集的,比如我现在的频繁项集是{2,3,5},它的构造元素是{2},{3},{5},所以需要将{2},{3},{5}两两合并然后再根据上面的calcConf函数计算置信度。代码如下:

1
2
3
4
5
6
7
8
#从最初的项集中生成更多的规则
def rulesFromConseq(freqSet, H, supportData, brl, minConf=0.7):
m = len(H[0])
if (len(freqSet) > (m + 1)): #进一步合并项集
Hmp1 = aprioriGen(H, m+1)#create Hm+1 new candidates
Hmp1 = calcConf(freqSet, Hmp1, supportData, brl, minConf)
if (len(Hmp1) > 1): #need at least two sets to merge
rulesFromConseq(freqSet, Hmp1, supportData, brl, minConf)

主要用于获取 特定的参数,使得损失函数变得最小。

  • 损失函数:

$$
J(\theta)=\frac{1}{2m}\sum_{i=1}^{m}(h_{\theta}(x^{(i)}-y^{(i)})^2
$$

迭代过程

下图显示梯度下降的过程

WX20180315-175706
WX20180315-175706
WX20180315-175706

更新函数:

$$
$\theta_j:=\theta_j-\alpha\frac{\partial}{\partial \theta_j}J(\theta) $
$$

其中 $\alpha$ 表示学习速率,$\alpha$ 过大会导致错失最小值,$\alpha$ 过小会导致更新速度变慢

简化偏导

$$
\frac{\partial}{\partial \theta_j}J(\theta) = \frac{\partial}{\partial \theta_j}\frac{1}{2}(h_{\theta}(x)-y)^2
​ =2\frac{1}{2}(h_{\theta}(x)-y) \frac{\partial}{\partial \theta_j}(h_{\theta}(x)-y)
​ =(h_{\theta}(x)-y)* \frac{\partial}{\partial \theta_j}(\sum_{i=1}^{n}\theta_{i}x_{i}-y)
​ =(h_{\theta}(x)-y)* x_{j}
$$

单训练样本更新函数

$$\theta_j:=\theta_j+\alpha(y^{(i)}-h_{\theta}(x^{(i)})) x^{(i)}_{j}$$

多训练样本更新函数

$$\theta_j:=\theta_j+\alpha\sum_{i=1}^{m}(y^{(i)}-h_{\theta}(x^{(i)})) x^{(i)}_{j}$$

  • 假设函数: $$h_{\theta}=\theta_{0}x_{0}+\theta_{1}x_{1}+…+\theta_{n}x_{n}$$
  • 损失函数: $$J(\theta)=\frac{1}{2m}\sum_{i=1}^{m}(h_{\theta}(x^i)-y^i)^2+\lambda\sum_{j=1}^{n}|\theta_{j}|$$
  • 目标: $$\min J(\theta_{0},\theta_{1},\theta_{2}….\theta_{n})$$

回归的一般形式:

  • 假设函数: $h_{\theta}=\theta_{0}x_{0}+\theta_{1}x_{1}+…+\theta_{n}x_{n}=\theta^TX$
  • 损失函数: $J(\theta)=\frac{1}{2m}\sum_{i=1}^{m}(h_{\theta}(x^i)-y^i)^2$
  • 目标: $\min J(\theta_{0},\theta_{1},\theta_{2}….\theta_{n})$

其中 n 表示特征数,m 表示样本数,损失函数中的 1/2 只是为了计算的方便。

  • 假设函数: $h_{\theta}=\theta_{0}x_{0}+\theta_{1}x_{1}+…+\theta_{n}x_{n}$
  • 损失函数: $J(\theta)=\frac{1}{2m}\sum_{i=1}^{m}(h_{\theta}(x^i)-y^i)^2+\lambda\sum_{j=1}^{n}\theta_{j}^2$
  • 目标: $\min J(\theta_{0},\theta_{1},\theta_{2}….\theta_{n})$

1、安装kafka

1
brew install kafka

该命令会自动安装 zookeeper 依赖

可通过 brew info kafka查看 kafka 的相关安装信息。

2、修改server.properties

执行命令:vi /usr/local/etc/kafka/server.properties
增加一行配置如下:

1
2
#listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092

3、启动 zk

1
zkServer start

4、启动kafka

1
kafka-server-start /usr/local/etc/kafka/server.properties

5、测试kafka

启动生产者:

1
kafka-console-producer --topic [topic-name]  --broker-list localhost:9092(第2步修改的listeners)

启动消费者:

1
kafka-console-consumer --bootstrap-server localhost:9092 —topic [topic-name] --from-beginning

上述命令参数说明:

  • –broker-list
  • –bootstrap-server
  • –from-beginning 获取该主题创建起的所有记录(未验证)

注意

若部署的kafka是集群形式,命令行起的消费者必须在kafka集群上起,具体啥原因还不清楚,可能自己的配置有问题。

参考

kafka官网quick start