objectSparkPi{ defmain(args: Array[String]) { val spark = SparkSession .builder .appName("Spark Pi") .getOrCreate() val slices = if (args.length > 0) args(0).toInt else2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.sparkContext.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y <= 1) 1else0 }.reduce(_ + _) println(s"Pi is roughly ${4.0 * count / (n - 1)}") spark.stop() } }
defreduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { caseSome(value) => Some(f(value, taskResult.get)) caseNone => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection")) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
defrunJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { thrownewIllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } //生成task并提交 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }