0%

spark2-启动Driver

启动Driver

前面介绍了从命令行提交任务开始,至Master注册Driver的流程。接下来是 Master将注册的Driver信息发送到Worker,在Worker节点启动Driver。

一、Master发送LaunchDriver消息

上一节讲到 Master(org.apache.spark.deploy.master.Master) 接收 ClientEndPoint发送的注册Driver的信息。接下来调用schedule()方法,启动Driver和Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
// 持久化Driver 用于master recovery 时恢复Driver
persistenceEngine.addDriver(driver)
// 注册 Driver
waitingDrivers += driver
drivers.add(driver)
// launch Driver 和 Executor
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
//忽略
...
}

schedule() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// 打乱Worker顺序,避免Driver过度集中
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 变量Worker如果Work节点剩余内存和core足够,启动Driver
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//发送LauncherDriver消息给Worker节点
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// 在work节点启动Executor,该内容在《spark4-启动Executor》讲解
startExecutorsOnWorkers()
}

schedule中调用launchDriver(worker, driver)方法

1
2
3
4
5
6
7
8
9
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
// 调用RpcEndPointRef,发送LauncherDriver消息给Worker节点
// 此Work是 schedule 随机选择的
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}

二、Worker节点启动Driver

Worker(org.apache.spark.deploy.worker.Worker)的receive方法接收并处理LaunchDriver信息,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def receive: PartialFunction[Any, Unit] = synchronized {
// 忽略
//...
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
//封装Driver信息为DriverRunner
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
//启动Driver
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}

查看DriverRunner的start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/** 启动一个线程,去运行 这个driver. */
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}

// 从其他环境中下载driver的jars 并在本地运行 driver
val exitCode = prepareAndRunDriver()

// 根据exitCode,设置 finalState
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}

// 一旦Driver运行完毕,
// work会 执行org.apache.spark.deploy.worker.Worker中的 handleDriverStateChanged()
//
// - work会向Master发送消息driverStateChanged
// - 释放相关资源
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[worker] def prepareAndRunDriver(): Int = {
// 下载Driver jar 到Worker本地,即本示例中的 /path/to/examples.jar
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
// 替换参数中的workerUrl和localJarFilename
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}

// 将Driver中的参数组织为Linux命令
// 通过java执行组织好的命令
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
// 这一步就是启动Driver,即执行 /parh/to/examples.jar中的main方法.
runDriver(builder, driverDir, driverDesc.supervise)
}

执行步骤如上图注释,最终使用Java中的java.lang.ProcessBuilder类执行Linux命令的方式启动Driver,Linux命令大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
java -cp $SPARK_ASSEMBLY_JAR \
-Xms1024M -Xmx1024M -Dakka.loglevel=WARNING \
-Dspark.executor.memory=512m \
-Dspark.driver.supervise=false \
-Dspark.submit.deployMode=cluster \
-Dspark.app.name=org.apache.spark.examples.SparkPi \
-Dspark.rpc.askTimeout=10 \
-Dspark.master=$MasterUrl -XX:MaxPermSize=256m \
org.apache.spark.deploy.worker.DriverWrapper \
$WorkerUrl \
/path/to/example.jar \
org.apache.spark.examples.SparkPi \
1000

到这里,通过spark-submit上传的/path/to/examples.jar,通过java -cp命令在Worker节点开始运行了,即Launch Driver,所谓的Driver就是/path/to/examples.jar
最后,将Driver的执行状态返回给Master。

总结

介绍了Master将Driver发送到Worker,及在Worker节点启动Driver的流程,如下

参考

Spark源码阅读:Driver的注册与启动