Tech Whims

Saprk3.x Journey of Discovery | Kyuubi1.7 Overview和部署核心参数调优

张晓龙 / 2023-04-17



大数据平台为了”降本增效“,进行 hive to spark 的计算引擎替换。为了更好的使用 spark ,引入 kyuubi 作为中间件,作为提交 spark 任务的入口,管理 spark sesstion 和用户。

下面对 kyuubi 做简单的介绍,以及关键特性解读。 文章内容偏向通识介绍,关于更细节的分析和讨论在其他文章中。

1. kyuubi overview

1.1 是什么

  1. Kyuubi is a high-performance universal JDBC and SQL execution engine.
  2. Kyuubi relies on Apache Spark to provide high-performance data query capabilities, Besides, Kyuubi improves ad-hoc responsiveness through the way of engine caching, and enhances concurrency through horizontal scaling and load balancing.
  3. Kyuubi’s vision is to build on top of Apache Spark and Data Lake technologies to unify the portal and become an ideal data lake management platform.

1.2 架构图 Overview

Architecture Overview

根据架构图,一些核心关键点:

1.3 高可用 & 和其他中间件对比

  1. Kyuubi provides high availability and load balancing solutions based on Zookeeper, as shown in the following diagram.

Kyuubi high availability

  1. Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)

Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS)

2. Spark 3 特性以及 Kyuubi 带来的增强

2.1 1.7版本技术手册上的基础特性

2.1.1 Deploying Kyuubi

activate config files:

(1)kyuubi-defaults.conf,all Hive primitive configurations,the Spark derivatives。Kyuubi will take these configurations as system wide defaults for all applications it launches.

(2)hive-site.xml,copy from hive-site.xml,single Spark application will automatically load this config file to its classpath,lower priority than kyuubi-default.xml

(3)JDBC Connection URL,pass Hive primitives or Spark derivatives directly in the JDBC connection 。override  the defaults for each account user!   THIS WORKS ONLY ONCE!!!(when effect in runtime, once!)

2.1.2 share levels

share levels

2.1.3 kyuubi engine 的 TTL

kyuubi engine 的 TTL

2.2 Dynamic Resource Allocation w/ External Shuffle Service)(动态资源分配)

首先是动态资源分配,Spark本身已经提供了Executor的动态伸缩能力。这几个参数配置在语义上是非常明确的,描述了Executor最少能有多少个,最多能有多少个,最大闲置时长,以此控制Executor的动态创建和释放。 Dynamic Resource Allocation

引入了Kyuubi后,结合刚才提到的Share Level和Engine的创建机制,我们可以实现Driver的动态创建,还引入了一个参数 engine.idle.timeout,约定Driver闲置了多长时间以后也释放,这样就实现Spark Driver的动态创建与释放。

2.3 Spark Adaptive Query Execution (AQE) & Best Practices for Applying AQE to Kyuubi

例子: AQE CASE

Join作为例子来说,等值的Inner Join,大表对大表做Sort Merge Join,大表对小表做Broadcast Join,但大小表的判断发生在SQL编译优化阶段,也就是在SQL执行之前。

考虑这样场景,

1、两个大表和大表Join,加了一个过滤条件,然后发现跑完过滤条件之后,它就变成了一个大表和一个小表Join了,可以满足Broadcast Join的条件,但因为执行计划是在没跑SQL之前生成的,所以它还是一个Sort Merge Join,这就会引入一个不必要的Shuffle。这就是AQE优化的切入点,可以让SQL先跑一部分,然后回头再跑优化器

2、另一个典型的场景是数据倾斜。大数据不怕数据量大,但是怕数据倾斜,因为在理想情况下,性能是可以通过硬件的水平扩展来实现线性的提升,但是一旦有了数据倾斜,可能会导致灾难。

是以Join为例,假定是一个等值的Inner Join,有个别的partition特别大,这种场景会有一些需要修改SQL的解决方案,比如把这些大的挑出来做单独处理,最后把结果Union在一起;或者针对特定的Join Key加盐,比如加一个数字后缀,将其打散,但是同时还要保持语义不变,就是说右表对应的数据要做拷贝来实现等价的Join语义,最后再把添加的数字后缀去掉。

可以发现,这样的一个手工处理方案也是有一定规律可循的,在Spark中将该过程自动化了,所以在Spark3里面开启了AQE后,就能自动帮助解决这类Join的数据倾斜问题。如果我们的ETL任务里面有很多大表Join,同时数据质量比较差,有严重的数据倾斜,也没有精力去做逐条SQL的优化,这样情况从HiveQL迁到Spark SQL上面可以带来非常显著的性能提升,10到100倍一点也不夸张!

2.4 Spark通过Extension API提供了扩展能力

Kyuubi提供了KyuubiSparkSQLExtension,利用Extension API在原有的优化器上做了一些增强。

这里列举其中一部分增强规则:

Spark Extension API

以RepartitionBeforeWriteHive为例做下简单介绍,这条规则用于解决Hive的小文件写入问题。

对于Hive动态分区写入场景,如果执行计划最后一个stage,在写入Hive表之前,DataFrame的Partition分布与Hive表的Partition分布不一致,在数据写入时,每一个task就会将持有的数据写到很多Hive表分区里面,就会生成大量的小文件。当我们开启RepartitionBeforeWriteHive规则以后,它会在写入Hive表之前依照Hive表的分区插入一个Repartition算子,保证相同Hive表分区的数据被一个task持有,避免写入产生大量的小文件

Kyuubi为所有规则都提供了开关,如果只希望启用其中部分规则,参考配置文档将其打开即可。

KyuubiSparkSQLExtension提供一个Jar,使用时,可以把Jar拷贝到${SPAKR_HOME}/jars下面,或命令参数–jar添加Jar,然后开启KyuubiSparkSQLExtension,并根据配置项来选择开启特定功能。

2.5 Solution for Big Result Sets(该模式会降低性能)

Kyuubi supports incremental collection mode。This feature is disabled in default, you can turn on it by setting the configuration kyuubi.operation.incremental.collect to true.

该模式会降低性能!

2.6 Z-Ordering Support

To improve query speed, Kyuubi supports Z-Ordering to optimize the layout of data stored in all kind of storage with various data format. Z-Ordering Support

It contains three parties:

Due to the extension, z-order only works with Spark-3.1 and higher version.

2.7 SQL Lineage Support

The current lineage parsing functionality is implemented as a plugin by extending Spark’s QueryExecutionListener.

The SparkListenerSQLExecutionEnd event is triggered after the SQL execution is finished and captured by the QueryExecuctionListener, where the SQL lineage parsing process is performed on the successfully executed SQL.

Will write the parsed lineage information to the log file in JSON format.

3. 参数的配置和调优

基于 kyuubi1.7 版本、结合 spark3.2.1 版本,基于 hadoop2.7 版本,参数调整如下:

yarn 的基本配置(我们 spark 运行在 yarn)

配置文件参数备注
yarn-site.xmlyarn.nodemanager.aux-servicesmapreduce_shuffle,spark_shuffleNodeManager上运行的附属服务。需配置成mapreduce_shuffle、这里添加 spark_shuffle
yarn-site.xmlyarn.nodemanager.aux-services.spark_shuffle.classorg.apache.spark.network.yarn.YarnShuffleServicespark shuffle类
yarn-site.xmlspark.shuffle.service.port7337external 端口
yarn-site.xmlspark.yarn.shuffle.stopOnFailurefalseexternal shuffle service – external启动
yarn-site.xmlspark.yarn.shuffle.service.metrics.namespacesparkShuffleServiceexternal shuffle service – 指标监控
yarn-site.xmlspark.yarn.shuffle.service.logs.namespaceexternal shuffle service –  日志输出

spark 基本配置 - shuffle、DRA

配置文件参数备注
spark-default.confspark.shuffle.service.enabledtrue客户端连接external shuffle service
spark-default.confspark.shuffle.service.port7337客户端连接external shuffle service
spark-default.confspark.dynamicAllocation.enabledtrue开启动态资源分配
spark-default.confspark.dynamicAllocation.executorIdleTimeout60s回收container的等待时间
spark-default.confspark.dynamicAllocation.cachedExecutorIdleTimeoutinfinity当executor上缓存有数据时,回收等待时间
spark-default.confspark.dynamicAllocation.initialExecutors2初始executor数量,默认等于minExecutors
spark-default.confspark.dynamicAllocation.maxExecutors60每个Application最⼤并发分配的executor数
spark-default.confspark.dynamicAllocation.minExecutors1每个Application最⼩分配的executor数
spark-default.confspark.dynamicAllocation.executorAllocationRatio1最理想的情况下,有多少待执行的任务,那么我们就新增多少个Executor,从而达到最大的任务并发度。但是这也有副作用,如果当前任务都是小任务,那么这一策略就会造成资源浪费。可能最后申请的Executor还没启动,这些小任务已经被执行完了
spark-default.confspark.dynamicAllocation.schedulerBacklogTimeout1s待执行的任务积压超过这个时间,将会请求新的执行者。
spark-default.confspark.dynamicAllocation.sustainedSchedulerBacklogTimeout1s资源不足时,多长时间开始申请executor。
spark-default.confspark.dynamicAllocation.shuffleTracking.enabledtruespark3新增,启用shuffle文件跟踪,此配置不会回收保存了shuffle数据的executor
spark-default.confspark.dynamicAllocation.shuffleTracking.timeoutinfinity启用shuffleTracking时控制保存shuffle数据的executor超时时间,默认使用GC垃圾回收控制释放。如果有时候GC不及时,配置此参数后,即使executor上存在shuffle数据,也会被回收。

spark 基本配置 - AQE

配置文件参数备注
spark-default.confspark.sql.adaptive.enabledtrue开启 aqe
spark-default.confspark.sql.adaptive.coalescePartitions.enabledtrueWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’), to avoid too many small tasks.
spark-default.confspark.sql.adaptive.coalescePartitions.initialPartitionNum200The initial number of shuffle partitions before coalescing. By default it equals to ‘spark.sql.shuffle.partitions’. This configuration only has an effect when ‘spark.sql.adaptive.enabled’ and ‘spark.sql.adaptive.coalescePartitions.enabled’ are both true.
spark-default.confspark.sql.adaptive.coalescePartitions.minPartitionNum200
spark-default.confspark.sql.adaptive.advisoryPartitionSizeInBytes64MBThe default value of spark.sql.adaptive.advisoryPartitionSizeInBytes is 64M. Typically, if we are reading and writing data with HDFS, matching it with the block size of HDFS should be the best choice, i.e. 128MB or 256MB.
spark-default.confspark.sql.adaptive.localShuffleReader.enabledtrue
spark-default.confspark.sql.adaptive.skewJoin.enabledtrueWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.
spark-default.confspark.sql.adaptive.skewJoin.skewedPartitionFactor10
spark-default.confspark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB

4. 相关材料

1、kyuubi1.7 技术手册