spark-submit的提交过程 介绍在Standalone模式下,从命令行使用spark-submit提交任务开始,到将Driver提交到Master的过程。
一、spark-submit提交任务 下面是Spark doc中的一个命令行提交的例子,也是我们分析任务调度的起点
1 2 3 4 5 6 7 8 9 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark:xxxx \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-core 100 \ /path/to/examples.jar \ 1000
注意,上图中的deploy-mode为cluster ,spark-submit脚本的过程可参见 上一篇文章《spark0-spark_submit脚本执行逻辑》。
最终运行为:
1 2 3 4 5 6 7 8 9 10 11 12 /opt/jdk1.8/bin/java \ -Dhdp.version=2.6.0.3-8 \ -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ \ org.apache.spark.deploy.SparkSubmit \ --class org.apache.spark.examples.SparkPi \ --master spark:xxxx \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-core 100 \ /path/to/examples.jar \ 1000
二、org.apache.spark.deploy.SparkSubmit shell中调用了 SparkSubmit 的main函数,main函数主要逻辑: 处理传入的配置信息并设置为环境变量(Driver、RestSubmissionClient或Client通过环境变量读取此配置)、使用类加载器加载配置的jar等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain (): Unit = { if (args.proxyUser != null ) { val proxyUser = UserGroupInformation .createProxyUser(args.proxyUser, UserGroupInformation .getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction [Unit ]() { override def run (): Unit = { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { } } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } if (args.isStandaloneCluster && args.useRest) { try { printStream.println("Running Spark using the REST application submission protocol." ) doRunMain() } catch { } } else { doRunMain() }
通过反射启动childMainClass :
在例子中 childMainClass 代表 org.apache.spark.deploy.Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private def runMain ( childArgs: Seq [String ], childClasspath: Seq [String ], sysProps: Map [String , String ], childMainClass: String , verbose: Boolean ): Unit = { val loader = if (sysProps.getOrElse("spark.driver.userClassPathFirst" , "false" ).toBoolean) { new ChildFirstURLClassLoader (new Array [URL ](0 ), Thread .currentThread.getContextClassLoader) } else { new MutableURLClassLoader (new Array [URL ](0 ), Thread .currentThread.getContextClassLoader) } Thread .currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } for ((key, value) <- sysProps) { System .setProperty(key, value) } var mainClass: Class [_] = null try { mainClass = Utils .classForName(childMainClass) } catch { } val mainMethod = mainClass.getMethod("main" , new Array [String ](0 ).getClass) if (!Modifier .isStatic(mainMethod.getModifiers)) { throw new IllegalStateException ("The main method in the given main class must be static" ) } try { mainMethod.invoke(null , childArgs.toArray) } catch { } }
三、SparkSubmit发送消息 向 master 注册 Driver SparkSubmit的反射子类(childMainClass)根据不同模式 会有多种类型
org.apache.spark.deploy.Client
org.apache.spark.deploy.yarn.Client
org.apache.spark.deploy.rest.RestSubmissionClient
用户自定义的app类
这里 我们选择StandaloneCluster模式进行说明,SparkSubmit使用反射运行org.apache.spark.deploy.Client.main()
Client的main方法先处理传入的参数,然后创建RpcEnv对象。 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 object Client { def main (args: Array [String ]) { val conf = new SparkConf () val driverArgs = new ClientArguments (args) if (!conf.contains("spark.rpc.askTimeout" )) { conf.set("spark.rpc.askTimeout" , "10s" ) } Logger .getRootLogger.setLevel(driverArgs.logLevel) val rpcEnv = RpcEnv .create("driverClient" , Utils .localHostName(), 0 , conf, new SecurityManager (conf)) val masterEndpoints = driverArgs.masters.map(RpcAddress .fromSparkURL). map(rpcEnv.setupEndpointRef(_, Master .ENDPOINT_NAME )) rpcEnv.setupEndpoint("client" , new ClientEndpoint (rpcEnv, driverArgs, masterEndpoints, conf)) rpcEnv.awaitTermination() } }
ClientEndpoint 的 onStart()方法会 发送消息给Master,注册Driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 override def onStart (): Unit = { driverArgs.cmd match { case "launch" => val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => cp.split(java.io.File .pathSeparator) } val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File .pathSeparator) } val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils .splitCommandString).getOrElse(Seq .empty) val sparkJavaOpts = Utils .sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command (mainClass, Seq ("{{WORKER_URL}}" , "{{USER_JAR}}" , driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) val driverDescription = new DriverDescription ( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) ayncSendToMasterAndForwardReply[SubmitDriverResponse ]( RequestSubmitDriver (driverDescription)) case "kill" => val driverId = driverArgs.driverId ayncSendToMasterAndForwardReply[KillDriverResponse ](RequestKillDriver (driverId)) } }
如上所示Driver信息 的主要包含 jarUrl、memory、cores、command和supervisor。 信息除了command外均容易理解,查看Command的内容
1 2 3 4 5 6 7 8 private [spark] case class Command ( mainClass: String , // DriverWrapper arguments: Seq [String ], environment: Map [String , String ], classPathEntries: Seq [String ], libraryPathEntries: Seq [String ], javaOpts: Seq [String ] ) {}
至此,Client提交 Driver 流程结束了。
四、Master处理RequestSubmitDriver消息 Master(org.apache.spark.deploy.master.Master
)的receiveAndReply方法接收Client发送的消息RequestSubmitDriver ,将收到的Driver注册到waitingDrivers。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 override def receiveAndReply (context: RpcCallContext ): PartialFunction [Any , Unit ] = { case RegisterWorker ( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => case RequestSubmitDriver (description) => if (state != RecoveryState .ALIVE ) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX} : $state . " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse (self, false , None , msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() context.reply(SubmitDriverResponse (self, true , Some (driver.id), s"Driver successfully submitted as ${driver.id} " )) } case RequestKillDriver (driverId) => case RequestDriverStatus (driverId) => case RequestMasterState => case BoundPortsRequest => case RequestExecutors (appId, requestedTotal) => case KillExecutors (appId, executorIds) => }
五、总结 介绍了deploy-mode=cluster模式下,从命令行提交任务,到Master端接收并注册Driver的过程,完整流程如下