近几年大数据飞速发展,数据研发的工具集也越来越完备。以往做数据研发可能需要手写 MAP REDUCE,效率还不高。现在可以说是 80%以上的任务都可以通过 SQL 搞定。
在主流的大数据工具集中,前有 Hive SQL, 后有 Spark SQL,Flink SQL。开源社区通过不断努力让 SQL 打通数据流的每一个环节。下面就从最近读取的论文[SparkSQLSigmod2015]去窥探 SPARK SQL 的实现逻辑。
Spark SQL 系统位图
从系统位图上 Catalyst 在 Spark SQL 中起承上启下的作用。
Catalyst查询编译器
用户编写的SQL是无法直接被底层计算框架执行,必须要经过几个转换阶段,转变成框架能够识别的代码或者类对象,在Spark中,一般需要经过以下几个步骤,分为逻辑执行计划部分和物理执行计划部分。
SQL Query,需要经过词法和语法解析,由字符串转换为树形的抽象语法树, 通过遍历抽象语法树生成未解析的逻辑语法树(Unresolved LogicPlan),对应SQL解析后的一种树形结构,本身不包含任务数据信息,需要经过一次遍历之后,转换成成包含解析后的逻辑算子树(Analyzed LogicPlan),本身携带了各种信息,最后经过优化后得到最终的逻辑语法树(Optimized LogicPlan)。最后都要转化成RDD的调用代码,才能被 Spark core所执行。
Parser
- SqlContent 先通过 SparkSqlParser 生成语法树。
- Spark1.x 版本使用的是 Scala 原生的 Parser 语法解析器,从 2.x 后改用的是第三方语法解析工具 ANTLR4,只需要定制好语法,可以通过插件自动生成对应的解析代码。
- 然后通过 AstBuilder 配合 ANTLR4 的 Visitor 模式自主控制遍历 Tree,将 antlr 里面的节点都替换成catalyst(优化器系统)里面的类型,所有的类型都继承了 TreeNode 特质,TreeNode 又有子节点 children: Seq[BaseType],便有了树的结构。
- 此过程解析完后形成的AST(抽象语法树)为 Unresolved LogicalPlan。
Analyzer
- 上个步骤还只是把 SQL 字符串通过 antlr4 拆分并由 SparkSqlParser 解析成各种 LogicalPlan(TreeNode的子类),每个 LogicalPlan 究竟是什么意思还不知道。
- 接下来就需要通过 Analyzer 去把不确定的属性和关系,通过 catalog 和一些适配器方法确定下来,比如要从Catalog 中解析出表名 user,是临时表、临时 view,hive table 还是 hive view,schema 又是怎么样的等都需要确定下来。
- 将各种 Rule 应用到 Tree 之上的真正执行者都是 RuleExecutor,包括后面的 Optimizer 也继承了RuleExecutor, 解析的套路是递归的遍历,将新解析出来的 LogicalPlan 来替换原来的 LogicalPlan。
- 此过程解析完后形成的AST为 Resolved LogicalPlan。若没有action操作,后续的优化,物理计划等都不会执行。
Optimizer
- 这个步骤是根据经验来对SQL进行优化,比如谓词下推、列值裁剪、常量累加等。
- Optimizer 也继承了 RuleExecutor,并定义了一批规则,和 Analyzer 一样对输入的 plan 进行递归处理,此过程解析完后形成的AST为 optimized LogicalPlan。
SparkPlanner
通过优化后的 LogicalPlan 还只是逻辑上的,接下来需要通过 SparkPlanner 将 Optimized LogicalPlan 应用到一系列特定的 Strategies 上,即转化为可以直接操作真实数据的操作及数据和RDD的绑定等,此过程解析完后形成的 AST 为 PhysicalPlan。
PrepareForExecution
此模块将 PhysicalPlan 转化为 Executable PhysicalPlan,主要是插入 shuffle 操作和 internal row 的格式转换。
CBO (Cost Based Optimizer) 优化
Join Cardinality Estimation
• Inner-Join: The number of rows of “A join B on A.k1 = B.k1” is estimated as: num(A B) = num(A) * num(B) / max(distinct(A.k1), distinct(B.k1)), – where num(A) is the number of records in table A, distinct is the number of distinct values of that column. – The underlying assumption for this formula is that each value of the smaller domain is included in the larger domain.
• We similarly estimate cardinalities for Left-Outer Join, Right-Outer Join and Full-Outer Join
Q11 SQL
SELECT c_customer_id customer_id, c_first_name customer_first_name, c_last_name customer_last_name, c_preferred_cust_flag customer_preferred_cust_flag, c_birth_country customer_birth_country, c_login customer_login, c_email_address customer_email_address, d_year dyear, sum(ss_ext_list_price - ss_ext_discount_amt) year_total, 's' sale_type FROM customer, store_sales, date_dim WHERE c_customer_sk = ss_customer_sk AND ss_sold_date_sk = d_date_sk GROUP BY c_customer_id, c_first_name, c_last_name, d_year , c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year UNION ALL SELECT c_customer_id customer_id, c_first_name customer_first_name, c_last_name customer_last_name, c_preferred_cust_flag customer_preferred_cust_flag, c_birth_country customer_birth_country, c_login customer_login, c_email_address customer_email_address, d_year dyear, sum(ws_ext_list_price - ws_ext_discount_amt) year_total, 'w' sale_type FROM customer, web_sales, date_dim WHERE c_customer_sk = ws_bill_customer_sk AND ws_sold_date_sk = d_date_sk GROUP BY c_customer_id, c_first_name, c_last_name, c_preferred_cust_flag, c_birth_country, c_login, c_email_address, d_year
Spark SQL VS Hive SQL
采样了近期 4000 多个任务,其中 2500 个 Spark SQL 任务,2000 个 Hive SQL 任务,绘制平均耗时对照图如下。不加额外的优化,切换到 Spark SQL,执行效率提升 3 倍。
当然 Hive SQL 切换到 Spark SQL 中还是会有一些小问题。比如在 2.3 版本之前
set hive.exec.orc.split.strategy=ETL;
- 对上游表生成的文件数做限制,保证不会产生空文件(reduce 数控制)
- hive 支持上下文的环境变量设置 BI --> ETL --> BI
参考
- https://amplab.cs.berkeley.edu/wp-content/uploads/2015/03/SparkSQLSigmod2015.pdf
- https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html
- https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
- https://issues.apache.org/jira/browse/SPARK-19809
- 📎2byinhuai-160614224024.pdf
- 📎ss2017costbasedoptimizer-170608221811.pdf
- 📎SparkSQLSigmod2015.pdf