Impala性能优化要点:

1. 为数据存储选择合适的文件格式(如:Parquet)

  通常对于大数据量来说,Parquet文件格式是最佳的

2. 防止入库时产生大量的小文件(insert ... values会产生大量小文件,应该避免使用)

  在impala外生成数据时,最好是text格式或者Avro,这样你就可以逐行的构建文件,到了impala之后,再通过简单的insert ... select语句将其转换为Parquet格式

3. 根据实际的数据量大小选择合适的分区粒度

  合适的分区策略可以对数据进行物理拆分,在查询的时候就可以忽略掉无用数据,提高查询效率,通常建议分区数量在3万以下(太多的分区也会造成元数据管理的性能下降)

4. 为分区key选择最小的整数类型

  虽然使用string类型也可以作为分区key,因为分区key最后都是作为HDFS目录使用,但是使用最小的整数类型作为分区key可以降低内存消耗

5. 选择合适的Parquet块大小

  默认情况下,Impala的insert ... select语句创建的Parquet文件都是每个分区256M(在2.0之后改为1G了),通过Impala写入的Parquet文件只有一个块,因而只能被一个机器当作一个单元进行处理。如果在你的Parquet表中只有一个或者几个分区,或者一个查询只能访问一个分区,那么你的性能会非常慢,因为没有足够的数据来利用Impala并发分布式查询的优势。

6. 在追求性能或者大数据量查询的时候,要先获取所需要的表的统计指标(如:执行compute stats)
7. 减少传输到client端的数据量

我们可以通过如下方式来降低到客户端的数据量:

  • 聚合(如 count、sum、max等)
  • 过滤(如WHERE)
  • LIMIT
  • 结果集禁止使用美化格式进行展示(在通过impala-shell展示结果时,添加这些可选参数:-B、 --output_delimiter)
8. 确认查询是以高效的逻辑方式进行规划

  在执行之前使用EXPLAIN来查看逻辑规划,分析执行逻辑

9. 使用合适的操作系统设置

Impala Join性能优化

  Impala join查询最简单的优化手段就是通过使用compute stats来收集join中每张表的统计信息,然后由Impala根据表的大小、列的唯一值数目等来自动优化查询。为了更加精确地获取每张表的统计信息,每次表的数据变更时(如执行insertload dataadd partition、或drop partition等)都要重新执行一遍compute stats

  如果join查询中表的统计信息不全或者Impala选择的join顺序不是最优时,你可以在select [distinct 、all]之后指定straight_join来覆盖掉impala的join顺序如:

select straight_join x 
from medium join small join (select * from big where c1 < 10) as big
where medium.id = small.id 
    and small.id = big.id; 

select distinct straight_join x 
from medium join small join (select * from big where c1 < 10) as big
where medium.id = small.id 
    and small.id = big.id;

这样Impala就会使用查询语句中表的顺序来指导join的处理。

当使用STRAIGHT_JOIn技术时,你必须手动指定join查询中表的顺序而不是依赖于Impala优化器。Impala优化器使用特殊的手段来估算join中每个阶段的结果集大小,而对于手动指定顺序来说,可以根据如下方式开始,然后再手动调节来达到最优:

  • 首先指定最大的表,此表一般保存于磁盘中
  • 指定最小的表,第二张表、第三张表等等之后的表都是通过网络传输的,你需要对这些结果集进行裁剪处理以降低传输数据量
  • 指定次小表,再到次次小表等

例如:如果你的表的大小如下:BIGMEDIUMSMALLTINY,那么你的顺序应该如此:BIG join TINY join SMALL join MEDIUM

   Impala查询优化器根据表的绝对或者相对大小来选择不同技术来执行join查询。默认情况下是 broadcast join,右边的表都是小于左边的表,右边的表的内容会被分发到其他的查询节点中。

   另一种技术就是partitioned join,这种技术更加适用于大小差不多大的大表join,使用这种方式的话,每张表中的分区都会把数据分发到其他节点中,这样就可以这些数据子集就可以并发处理了。 broadcast或者partition join的选择是根据compute stats采集到的可用统计指标来衡量的。对于指定查询语句,可以通过执行EXPLAIN就可以查看选用的是哪个join策略。

统计指标不可用时join如何查询

   当join中表或者列的统计指标不可用时,Impala将无统计指标的表认为统计指标都为0,这些表都将作为右表处理。

通过EXPLAIN Plans和Query Profiles来了解Impala查询性能

  想要了解Impala查询的高性能注意事项,可以阅读查询的EXPLAIN输出,你可以获取EXPLAIN的执行计划,而无须真正的执行query。

  想要查看一个查询的物理性能特性的概览,可以在执行查询之后立马在impala-shell中执行SUMMARY命令,输出的信息中将展示哪个阶段耗时最多,以及每一阶段估算的内存消耗、行数与实际的差异。

  想要了解查询的详细性能特征,可以在执行查询之后立马在impala-shell中执行PROFILE命令,这些底层的信息包括内存、CPU、I/O以及网络消耗的详细信息,因此只能在一个真实的查询之后才可用。

使用EXPLAIN进行性能优化

  EXPLAIN语句概述了查询将执行的逻辑步骤,例如如何在节点间分配工作以及中间结果如何合并为最终结果, 这些你都可以在查询真正执行之前获得,你可以使用这些信息来检查查询是否会以某种非高效的方式执行。

[9-24-143-25:21000] > explain select ds,count(*) from t_ed_xxxx_newuser_read_feature_n group by ds order by ds;
Connection lost, reconnecting...
Query: explain select ds,count(*) from t_ed_xxxx_newuser_read_feature_n group by ds order by ds
+----------------------------------------------------------------------------------------------+
| Explain String                                                                               |
+----------------------------------------------------------------------------------------------+
| Max Per-Host Resource Reservation: Memory=9.94MB                                             |
| Per-Host Resource Estimates: Memory=27.00MB                                                  |
|                                                                                              |
| PLAN-ROOT SINK                                                                               |
| |                                                                                            |
| 05:MERGING-EXCHANGE [UNPARTITIONED]                                                          |
| |  order by: ds ASC                                                                          |
| |                                                                                            |
| 02:SORT                                                                                      |
| |  order by: ds ASC                                                                          |
| |                                                                                            |
| 04:AGGREGATE [FINALIZE]                                                                      |
| |  output: count:merge(*)                                                                    |
| |  group by: ds                                                                              |
| |                                                                                            |
| 03:EXCHANGE [HASH(ds)]                                                                       |
| |                                                                                            |
| 01:AGGREGATE [STREAMING]                                                                     |
| |  output: sum_init_zero(default.t_ed_xxxx_newuser_read_feature_n.parquet-stats: num_rows) |
| |  group by: ds                                                                              |
| |                                                                                            |
| 00:SCAN HDFS [default.t_ed_xxxx_newuser_read_feature_n]                                    |
|    partitions=372/372 files=2562 size=15.15GB                                                |
+----------------------------------------------------------------------------------------------+

自底向上读取EXPLAIN的输出:

  • 00阶段: 显示了底层的详细信息,如:扫描的表,表的分区数,文件数以及文件大小等信息,根据这些信息,你可以估算大概的耗时
  • 01阶段: 聚合操作SUM并行地在不同的节点上执行
  • 03阶段: 将01阶段的结果进行传输
  • 04阶段: 将SUM结果进行合并
  • 02阶段: 排序操作并行地在不同的节点中进行
  • 05阶段: 排序结果合并,并且输出

EXPLAIN也会在PROFILE结果的头部输出。

使用SUMMARY 报告进行性能调优

  SUMMARY命令可以输出每一阶段的耗时,可以快速地了解查询的性能瓶颈,与PROFILE输出一样,它只能在查询之后才可用,并且显示实际的时间消耗。SUMMARY输出也会在PROFILE的头部输出的显示。

 [9-24-143-25:21000] > select ds,count(*) from t_ed_xxxx_newuser_read_feature_n group by ds order by ds;
 [9-24-143-25:21000] > summary;
+---------------------+--------+----------+----------+-------+------------+----------+---------------+--------------------------------------------+
| Operator            | #Hosts | Avg Time | Max Time | #Rows | Est. #Rows | Peak Mem | Est. Peak Mem | Detail                                     |
+---------------------+--------+----------+----------+-------+------------+----------+---------------+--------------------------------------------+
| 05:MERGING-EXCHANGE | 1      | 3.20s    | 3.20s    | 372   | 372        | 0 B      | 0 B           | UNPARTITIONED                              |
| 02:SORT             | 51     | 517.22us | 2.54ms   | 372   | 372        | 6.02 MB  | 6.00 MB       |                                            |
| 04:AGGREGATE        | 51     | 1.75ms   | 7.85ms   | 372   | 372        | 2.12 MB  | 10.00 MB      | FINALIZE                                   |
| 03:EXCHANGE         | 51     | 2.91s    | 3.10s    | 2.44K | 372        | 0 B      | 0 B           | HASH(ds)                                   |
| 01:AGGREGATE        | 51     | 135.29ms | 474.62ms | 2.44K | 372        | 2.03 MB  | 10.00 MB      | STREAMING                                  |
| 00:SCAN HDFS        | 51     | 1.08s    | 2.58s    | 2.56K | 96.53M     | 1.05 MB  | 1.00 MB       | default.t_ed_xxxx_newuser_read_feature_n |
+---------------------+--------+----------+----------+-------+------------+----------+---------------+--------------------------------------------+
使用PROFILE进行性能分析

  PROFILE语句将产生一个关于最近一次查询的底层报告的详细信息展示。与EXPLAIN不同,这些信息只在查询完成之后才会生成,它显示了每个节点上的物理详细信息如:读取的字节数,最大内存消耗等。
你可以根据这些信息来确定查询时I/O密集型,还是CPU密集型,网络是否导致瓶颈,是否某些节点性能差但是其它节点性能好等信息。

2.13.1 什么是Impala?
Impala是cloudera提供的一款高效率的Sql查询工具,提供实时的查询效果,官方测试性能比Hive快10到100倍,其Sql查询比SparkSql还要更加快速,号称是当前大数据领域最快的查询Sql工具;
Impala是基于Hive并使用内存进行计算,兼顾数据仓库,具有实时,批处理,多并发等优点;

2.13.2 Impala与Hive的区别
(1)相同点
① Impala与Hive都是构建在Hadoop之上的数据查询工具各有不同的侧重适应面,但从客户端使用来看,Impala与Hive有很多的共同之处,如数据表元数据、ODBC/JDBC驱动、SQL语法、灵活的文件格式、存储资源池等;
(2)不同点
①执行计划:
1)Hive:依赖于MapReduce执行框架,执行计划分成map->shuffle->reduce->map->shuffle->reduce…的模型。如果一个查询被编译成多个MapReduce,则会有更多的写中间结果。由于MapReduce执行框架本身的特点,过多的中间过程会增加整个查询的执行时间;
2)Impala:把执行计划编译为一棵执行计划树,可以更加自然地分发执行计划到各个Impala服务(Impalad)中进行执行,而不用像Hive那样将执行计划组合成管道型的map->reduce模式,以此保证Impala有更好的并发性和避免不必要的中间排序与shuffle过程;
②内存使用
1)Hive:在执行过程中,如果内存放不下所有数据,则会使用磁盘进行存储。每一轮MapReduce结束,中间结果也会写入HDFS中,同样由于MapReduce执行架构的特性,shuffle过程也会有写本地磁盘的操作;
2)Impala:在遇到内存放不下所有数据的时候,则直接会报错,而不是使用磁盘进行存储,所以这也使得Impala目前处理查询会受到一定的限制,最好还是与Hive配合使用;
③调度
1)Hive:任务调度依赖于Hadoop的Yarn调度策略;
2)Impala:调度由自己完成,内部有一种调度器simple-schedule,它会尽量满足数据的局部性,扫描数据的进程尽量靠近数据本身所在的物理机器,从而减少网络IO、负载等因素的影响;
④容错
1)Hive:Hive依赖于Hadoop的容错能力;
2)Impala:在查询过程中,没有容错逻辑,如果在执行的过程中发生了故障,则直接返回错误,这与Impala设计有关,因为Impala定位是实时查询,一次查询失败,再查一次就好了,再查一次的成本很低;
⑤适用面
1)Hive:用于复杂的批处理查询任务和数据转换任务;
2)Impala:用于实时数据分析,但是不支持自定义UDF,所以能处理的问题有一定的限制,需要配合Hive一起使用,对Hive的结果数据集进行实时分析;

2.13.3 Impala架构

Impala主要由Impalad、State Store、Catalogd和CLI组成;
(1)Impalad
①Impalad(Impala server,Impala服务):可以部署多个不同机器上,通常与datanode部署在同一个节点 方便数据本地计算,负责具体执行本次查询sql的impalad称之为Coordinator。每个impala server都可以对外提供服务;
(2)State Store:主要是跟踪集群中impalad的健康状态及位置信息;
(3)Catalogd:作为metadata访问网关,负责跟Hive的metastore进行交互,同步Hive的元数据到Impala自己的元数据中;
(4)CLI:用户操作Impala的方式(Impala Shell、JDBC、HUE);

2.13.4 Impala查询处理过程
(1)Impalad分为Java前端与C++处理后端,接受客户端连接的Impalad即作为这次查询的 Coordinator(协调器),Coordinator通过JNI调用Java前端对用户的查询Sql进行分析生成执行计划树;
(2)Java前端产生的执行计划树以Thrift数据格式返回给C++后端(Coordinator);
(3)Coordinator根据执行计划树和数据存储信息,通过调度器(simple-schedule)对生成的执行计划树分配给相应的后端执行器Impalad执行,查询结果返回给Java前端;

2.13.5 Impala Sql与Hive Sql的区别

基本的语法跟Hive的查询语句大体一样;
Impala支持开窗函数;
Impala不支持cluster by、 distribute by、sort by;
Impala不支持分桶表;
Impala不支持collect_set(col)和explode(col)函数;
Impala不支持自定义UDF函数;

作者:admin  创建时间:2023-06-27 23:06
最后编辑:admin  更新时间:2024-04-07 15:40