启动Driver 前面介绍了从命令行提交任务开始,至Master注册Driver的流程。接下来是 Master将注册的Driver信息发送到Worker,在Worker节点启动Driver。
一、Master发送LaunchDriver消息 上一节讲到 Master(org.apache.spark.deploy.master.Master
) 接收 ClientEndPoint发送的注册Driver的信息。接下来调用schedule()方法,启动Driver和Executor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 override def receiveAndReply (context: RpcCallContext ): PartialFunction [Any , Unit ] = { case RequestSubmitDriver (description) => if (state != RecoveryState .ALIVE ) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX} : $state . " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse (self, false , None , msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() context.reply(SubmitDriverResponse (self, true , Some (driver.id), s"Driver successfully submitted as ${driver.id} " )) } ... }
schedule() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private def schedule (): Unit = { if (state != RecoveryState .ALIVE ) { return } val shuffledAliveWorkers = Random .shuffle(workers.toSeq.filter(_.state == WorkerState .ALIVE )) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1 ) % numWorkersAlive } } startExecutorsOnWorkers() }
schedule中调用launchDriver(worker, driver) 方法
1 2 3 4 5 6 7 8 9 private def launchDriver (worker: WorkerInfo , driver: DriverInfo ) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some (worker) worker.endpoint.send(LaunchDriver (driver.id, driver.desc)) driver.state = DriverState .RUNNING }
二、Worker节点启动Driver Worker(org.apache.spark.deploy.worker.Worker
)的receive方法接收并处理LaunchDriver 信息,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 override def receive : PartialFunction [Any , Unit ] = synchronized { case LaunchDriver (driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId " ) val driver = new DriverRunner ( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker .maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
查看DriverRunner的start方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private [worker] def start () = { new Thread ("DriverRunner for " + driverId) { override def run () { var shutdownHook: AnyRef = null try { shutdownHook = ShutdownHookManager .addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId " ) kill() } val exitCode = prepareAndRunDriver() finalState = if (exitCode == 0 ) { Some (DriverState .FINISHED ) } else if (killed) { Some (DriverState .KILLED ) } else { Some (DriverState .FAILED ) } } catch { case e: Exception => kill() finalState = Some (DriverState .ERROR ) finalException = Some (e) } finally { if (shutdownHook != null ) { ShutdownHookManager .removeShutdownHook(shutdownHook) } } worker.send(DriverStateChanged (driverId, finalState.get, finalException)) } }.start() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private [worker] def prepareAndRunDriver (): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) def substituteVariables (argument: String ): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } val builder = CommandUtils .buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) }
执行步骤如上图注释,最终使用Java中的java.lang.ProcessBuilder 类执行Linux命令的方式启动Driver,Linux命令大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 java -cp $SPARK_ASSEMBLY_JAR \ -Xms1024M -Xmx1024M -Dakka.loglevel=WARNING \ -Dspark.executor.memory=512m \ -Dspark.driver.supervise=false \ -Dspark.submit.deployMode=cluster \ -Dspark.app.name=org.apache.spark.examples.SparkPi \ -Dspark.rpc.askTimeout=10 \ -Dspark.master=$MasterUrl -XX:MaxPermSize=256m \ org.apache.spark.deploy.worker.DriverWrapper \ $WorkerUrl \ /path/to/example.jar \ org.apache.spark.examples.SparkPi \ 1000
到这里,通过spark-submit上传的/path/to/examples.jar ,通过java -cp 命令在Worker节点开始运行了,即Launch Driver,所谓的Driver就是/path/to/examples.jar 。 最后,将Driver的执行状态返回给Master。
总结 介绍了Master将Driver发送到Worker,及在Worker节点启动Driver的流程,如下
参考 Spark源码阅读:Driver的注册与启动