启动Executor
一、回顾
在《1、提交driver》已经介绍过,org.apache.spark.deploy.master.Master 的 receiveAndReply方法接收Client发送的消息RequestSubmitDriver。
前面已经介绍了schedule()中launchDriver的流程,即《2、启动driver》。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } ... }
|
本篇继续介绍schedule()方法另一个部分,Launch Executor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() }
|
二、启动Executor前的准备
查看startExecutorsOnWorkers方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private def startExecutorsOnWorkers(): Unit = { for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
|
如上图注释,waitingApps信息主要是我们通过命令行传入的core和memory信息,startExecutorsOnWorkers方法的职责是调度waitingApps,即将core和memory分配到具体的Worker,Spark 任务调度之Register App介绍了Driver注册app的流程。
scheduleExecutorsOnWorkers方法中,可以使用spreadOutApps算法分配资源,即Executor分布在尽可能多的Worker节点上,相反,也支持Executor聚集在某些Worker节点上,通过参数spark.deploy.spreadOut配置,默认为true,如下
1
| private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
|
三、Launch Executor
startExecutorsOnWorkers方法中,最后调用allocateWorkerResourceToExecutors方法,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } }
|
上图最后处调用launchExecutor方法,如下
1 2 3 4 5 6 7 8 9 10
| private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
|
如上图注释,给Worker节点发送LaunchExecutor消息,Worker节点收到消息,Launch Executor部分就结束了,下一部分具体讲Executor在Worker节点的启动,最后,Worker接收LaunchExecutor消息对应代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq) appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { ... } }
|
总结
介绍Master节点Launch Executor的过程,分两步
- schedule waitingApps
- launch Executor
流程如下