VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 数据库 > sql数据库 >
  • sql语句大全之spark sql逻辑计划(优化完)转物理

在org.apache.spark.sql.execution中实现了有所有的数据库操作,但是注意这里仅仅是物理算子,这些操作分为三类:UnaryNode,LeafNode和BinaryNode。
 
一元节点UnaryNode的操作有:
 
Aggregate,DebugNode,EXchange,Filter,Generate,Project,Sample,Sort,StopAfter,TopK。
 
二元节点BinaryNode的操作有:
 
BroadcastNestedLoopJoin,CartesianProduct,SparkEquiInnerJoin。
 
叶子节点LeftNode的操作有:
 
ExistingRdd,ParquetTableScan。
 
分析一下join操作,join有两个孩子节点,是二元算子,其中会添加projection算子。有一种情况,比如T1表的a,b,c三个属性和T2表的a,d,e三个属性,如果在T1和T2表的a属性上做连接,最后输出三个属性T1.a,T1.b,T2.d。这样的话首先会在T1表上添加projection将a,b属性选出来,然后在T2表上添加Projection将a,d属性选出来,然后连接选出的属性,SparkEquiInnerJoin物理算子如下:
 
case class SparkEquiInnerJoin(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    left: SparkPlan,//左孩子,下面是右孩子
    right: SparkPlan) extends BinaryNode {
  //outputPartitioning是基类中的函数,在此只是重载。partition策略如何
  override def outputPartitioning: Partitioning = left.outputPartitioning
  //requiredChildDistribution是基类中的函数,在此是重载。孩子数据分布如何
  override def requiredChildDistribution =
    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
  def output = left.output ++ right.output
  def execute() = attachTree(this, "execute") {
    val leftWithKeys = left.execute().mapPartitions { iter =>
      //因为会有left孩子节点和leftWithKeys不一样的schema,所以要根据leftKeys做一次projection操作
      val generateLeftKeys = new Projection(leftKeys, left.output)
      iter.map(row => (generateLeftKeys(row), row.copy()))
    }
 
    val rightWithKeys = right.execute().mapPartitions { iter =>
      //因为会有left孩子节点和leftWithKeys不一样的schema,所以要根据leftKeys做一次projection操作      
      val generateRightKeys = new Projection(rightKeys, right.output)
      iter.map(row => (generateRightKeys(row), row.copy()))
    }
 
    // Do the join.做连接,连接左和右,得到所有的结果
    val joined = filterNulls(leftWithKeys).joinLocally(filterNulls(rightWithKeys))
    // Drop join keys and merge input tuples.
    // build每一行数据,就是简单的将生成的leftTuple和rightTuple相加
    joined.map { case (_, (leftTuple, rightTuple)) => buildRow(leftTuple ++ rightTuple) }
  }
 
  /**
   * Filters any rows where the any of the join keys is null, ensuring three-valued
   * logic for the equi-join conditions.
   */
  protected def filterNulls(rdd: RDD[(Row, Row)]) =
    rdd.filter {
      case (key: Seq[_], _) => !key.exists(_ == null)
    }
}
在上面的join物理算子中execute函数式每个物理操作都必须实现的函数,还有两个函数outputPartitioning和requiredChildDistribution,这两个函数的作用呆会探讨。
 
lazy val sparkPlan = planner(optimizedPlan).next()  
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)  
上面这两步中第一步是从优化过的逻辑执行计划生成物理执行计划,在此调用的QueryPlanner抽象类中的apply方法,这个方法将逻辑执行计划中的每个操作对应strategies中相应的case class来生成具体的物理操作,现在strategies有这么多个,可以在SqlContext类中找到,如下:
 
    /*
     * 所有的策略都在这里。
     * */
    val strategies: Seq[Strategy] =
      TopK ::
      PartialAggregation ::
      SparkEquiInnerJoin ::
      ParquetOperations ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil
第二步会调用outputPartitioning和requiredChildDistribution这两个函数,是从两个child的分区策略和数据分布情况来确定是否需要添加shuffle操作,具体代码是在Exchange.scala中的AddExchange函数中,在此对物理执行计划分析看是否需要添加shuffle。
 
以上分析基于spark sql的从优化完的逻辑执行计划到物理执行计划的生成,总体来说分两方面,一方面是利用strategies讲逻辑算子转化为物理算子,还有一方面,因为是分布式系统,少不了的需要在物理执行计划中添加shuffle,接下来:
 
1,具体分析什么情况下需要添加AddExchange。
 
2,需要了解所有的物理操作算子,特别是join算子的变形。
 
本文完
 
 
--------------------- 
作者:egraldloi 
来源:CSDN 
原文:https://blog.csdn.net/egraldloi/article/details/23139645 
版权声明:本文为博主原创文章,转载请附上博文链接!

相关教程