3.9 基于DataSet的代码到底是如何一步步转化成为RDD的
基于DataSet的代码转换为RDD之前需要一个Action的操作,基于Spark中的新解析引擎Catalyst进行优化,Spark中的Catalyst不仅限于SQL的优化,Spark的五大子框架(Spark Cores、Spark SQL、Spark Streaming、Spark GraphX、Spark Mlib)将来都会基于Catalyst基础之上。
Dataset.scala的collect方法的源码如下。
Spark 2.1.1版本的Dataset.scala的源码如下。
1. def collect(): Array[T] = collect(needCallback = true)
Spark 2.2.0版本Dataset.scala的源码与Spark 2.1.1版本相比具有如下特点:将Dataset的action包裹起来,这样可跟踪QueryExecution和时间成本,然后汇报给用户注册的回调函数。
1. def collect(): Array[T] = withAction("collect", queryExecution) (collectFromPlan)
进入collect(needCallback:true)的方法如下。
Spark 2.1.1版本的Dataset.scala的源码如下。
1. private def collect(needCallback: Boolean): Array[T] = { 2. def execute(): Array[T] = withNewExecutionId { 3. queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow) 4. } 5. 6. if (needCallback) { 7. withCallback("collect", toDF())(_ => execute()) 8. } else { 9. execute() 10. } 11. }
Spark 2.2.0版本的Dataset.scala的源码与Spark 2.1.1版本相比具有如下特点。
调用关键的代码SQLExecution.withNewExecutionId(sparkSession,qe){action(qe. executedPlan)}得到计算结果。
action(qe.executedPlan)是collect方法中传入的函数collectFromPlan,在函数collectFromPlan中传入参数qe.executedPlan。
1. private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { 2. try { 3. qe.executedPlan.foreach { plan => 4. plan.resetMetrics() 5. } 6. val start = System.nanoTime() 7. val result = SQLExecution.withNewExecutionId(sparkSession, qe) { 8. action(qe.executedPlan) 9. } 10. val end = System.nanoTime() 11. sparkSession.listenerManager.onSuccess(name, qe, end - start) 12. result 13. } catch { 14. case e: Exception => 15. sparkSession.listenerManager.onFailure(name, qe, e) 16. throw e 17. } 18. }
Spark 2.2.0版本的Dataset.scala的源码从spark plan中获取所有的数据。
1. private def collectFromPlan(plan: SparkPlan): Array[T] = { 2. plan.executeCollect().map(boundEnc.fromRow) 3. }
collect方法中关键的一行代码是queryExecution.executedPlan.executeCollect().map (boundEnc.fromRow),我们看一下executedPlan。executedPlan不用来初始化任何SparkPlan,仅用于执行。
QueryExecution.scala的源码如下。
1. class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { 2. ...... 3. //executePlan不应该被用来初始化任何Spark Plan,executePlan只用于执行 4. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) 5. ...... 6. lazy val toRdd: RDD[InternalRow] = executedPlan.execute() 7. ......
queryExecution.executedPlan.executeCollect()代码中的executeCollect方法运行此查询,将结果作为数组返回。executeCollect方法调用了byteArrayRdd.collect()方法。
SparkPlan .scala的executeCollect的源码如下。
1. def executeCollect(): Array[InternalRow] = { 2. val byteArrayRdd = getByteArrayRdd() 3. 4. val results = ArrayBuffer[InternalRow]() 5. byteArrayRdd.collect().foreach { bytes => 6. decodeUnsafeRows(bytes).foreach(results.+=) 7. } 8. results.toArray 9. }
byteArrayRdd.collect()方法调用RDD.scala的collect方法。collect方法最终通过sc.runJob提交Spark集群运行。
RDD.scala的collect方法的源码如下。
1. def collect(): Array[T] = withScope { 2. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) 3. Array.concat(results: _*) 4. }
回到QueryExecution.scala中,executedPlan.execute()是关键性的代码。
1. lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
进入SparkPlan.scala的execute返回的查询结果类型为RDD[InternalRow]。调用doExecute执行,SparkPlan应重写doExecute进行具体实现。在execute方法中就生成了RDD[InternalRow]。execute方法的源码如下。
1. final def execute(): RDD[InternalRow] = executeQuery { 2. doExecute() 3. }
SparkPlan.scala的doExecute()抽象方法没有具体实现,通过SparkPlan重写具体实现。产生的查询结果作为RDD[InternalRow]。
1. protected def doExecute(): RDD[InternalRow]
InternalRow是通过语法树生成的一些数据结构。其子类包括BaseGenericInternalRow、JoinedRow、Row、UnsafeRow。
InternalRow.scala的源码如下。
1. abstract class InternalRow extends SpecializedGetters with Serializable { 2. ...... 3. def setBoolean(i: Int, value: Boolean): Unit = update(i, value) 4. def setByte(i: Int, value: Byte): Unit = update(i, value) 5. def setShort(i: Int, value: Short): Unit = update(i, value) 6. def setInt(i: Int, value: Int): Unit = update(i, value) 7. def setLong(i: Int, value: Long): Unit = update(i, value) 8. def setFloat(i: Int, value: Float): Unit = update(i, value) 9. def setDouble(i: Int, value: Double): Unit = update(i, value) 10. ........
DataSet的代码转化成为RDD的内部流程如下。
Parse SQL(DataSet)→Analyze Logical Plan→Optimize Logical Plan→Generate Physical Plan→Prepareed Spark Plan→Execute SQL→Generate RDD
基于DataSet的代码一步步转化成为RDD:最终调用execute()生成RDD。