1. 介绍一下什么是Hive

Hive 是基于 Hadoop的一个数据仓库工具,可以将HDFS中的数据文件映射为一张数据库表,并提供类SQL查询功能(HQL),提供快速开发的能力。Hive本质是将SQL转换为 MapReduce的任务进行运算,从而不必开发专门的MapReduce应用,减少开发人员的学习成本,功能扩展很方便。

拓展:

hive存的是和hdfs的映射关系,hive是逻辑上的数据仓库,实际操作的都是hdfs上的文件,HQL就是用sql语法来写的mr程序

2. Hive的架构原理

需要对 Hive 的架构有个大致的印象:

  • 用户接口Client:Hive可以通过CLI(Command-line Interface,即命令行),JDBC/ODBC( jdbc 访问 hive)、WEBUI(浏览器访问 hive)。
  • 元数据Metastore:Hive的元数据保存在数据库中,如保存在MySQL,SQLServer,PostgreSQL,Oracle及Derby等数据库中(默认是derby)。Hive中的元数据信息包含表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等。(其实就是sql表与hdfs文件之间的映射Path)
  • 驱动器Driver
    • 解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步一般都用第三方工具库完成,比如 antlr;Antlr定义SQL的语法规则,完成SQL词法、语法解析,将SQL转化为抽象语法树AST Tree
    • 编译器(Physical Plan):将 AST 编译生成逻辑执行计划。
    • 优化器(Query Optimizer):对逻辑执行计划进行优化。
    • 执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于 Hive 来说,就是 MR/Spark。

Hive 通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的 Driver,结合元数据(MetaStore),将这些指令翻译成 MapReduce,提交到 Hadoop 中执行,最后,将执行返回的结果输出到用户交互接口。

拓展:

这里有有个易混淆点,Hive 元数据默认存储在 derby 数据库,不支持多客户端访问,所以将元数据存储在 MySQL 等数据库,支持多客户端访问。

3. HiveSQL转换为MapReduce的过程

HiveSQL ->AST(抽象语法树) -> QB(查询块) ->OperatorTree(操作树)->优化后的操作树->mapreduce任务树->优化后的mapreduce任务树

过程描述如下:

  • SQL Parser(SQL解析器):Antlr定义SQL的语法规则,完成SQL词法、语法解析,将SQL转化为抽象语法树AST Tree;
  • Semantic Analyzer(语义分析):遍历AST Tree,抽象出查询的基本组成单元QueryBlock;
  • Logical plan(逻辑执行计划):遍历QueryBlock,翻译为执行操作树OperatorTree;
  • Logical plan optimizer(逻辑优化器): 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量;
  • Physical plan(物理执行计划):遍历OperatorTree,翻译为MapReduce任务;
  • Physical plan optimizer(物理优化器):物理层优化器进行MapReduce任务的变换,生成最终的执行计划。

4. hive和传统数据库之间的区别

ANSI SQL指标准化SQL

5. HiveSQL语句不会转化为MapReduce作业的情况

Fetch 抓取是指, Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如: SELECT * FROM employees;在这种情况下, Hive 可以简单地读取 employee 对应的存储目录下的文件,然后输出查询结果到控制台。

在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive
默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走mapreduce。

案例

把 hive.fetch.task.conversion 设置成 more, 然后执行查询语句, 如下查询方式都不会执行 mapreduce 程序。

hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select \* from emp;

hive (default)> select ename from emp;

hive (default)> select ename from emp limit 3;

把 hive.fetch.task.conversion 设置成 none,然后执行查询语句,都会执行 mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select \* from emp;

hive (default)> select ename from emp;

hive (default)> select ename from emp limit 3;

6. 请简单介绍一下Hive的本地模式

大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况, Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置 hive.exec.mode.local.auto 的值为 true(该值默认为false),来让 Hive 在适当的时候自动启动这个优化。

set hive.exec.mode.local.auto=true;   *//开启本地 mr*

*//设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local   mr 的方式,默认*134217728,即 128M

set hive.exec.mode.local.auto.inputbytes.max=50000000;

*//设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默*

认为 4

set hive.exec.mode.local.auto.input.files.max=10;

7. 你使用过哪些 Hive 函数

(1)普通函数

(2)行转列函数和列转行函数

见第8题

(3)窗口函数

见第9题

8. 列转行和行转列函数有哪些

(1)行转列:把多行转成一列(多行变一行)

  • CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串,如果concat中任意字符串为null,则整个函数的返回结果为null。
  • CONCAT_WS(separator, str1, str2,…):一个特殊形式的 CONCAT()。第一个参数剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接的字符串之间。
    注意:CONCAT_WS must be “string or array[string]即concat_ws中的参数一定是字符串或字符串数组
  • COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生 Array 类型字段。
  • collect_list(col)将所有将结果放入,不去重,返回Array类型字段
    (2)列转行:把一列转成多行
  • EXPLODE(col):将 hive 一列中复杂的 Array 或者 Map 结构拆分成多行。
  • LATERAL VIEW:形成一张侧写表,它可以将原本的字段做一个关联。常和 split、explode 等 UDTF 一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。用法:
  • 注意:as不能省略
  • LATERAL VIEW udtf(expression) tableAlias AS columnAlias
    • 这个tableAias是侧写表的别名,里面只有colimnAlias这一个字段,注意:如果炸裂成两个列,则colimnAlias写两个别名

9. 介绍一下Hive中的窗口函数

窗口函数的语法

window\_function\_name(expression) 
OVER (
[partition\_defintion]
[order\_definition]
[frame\_definition]
)

首先需要指定窗口函数的函数名,也就是在上个例子中用的sum(),之后的OVER子句中即使没有内容,括号也需要保留,窗口由partition_defintion,order_definition,frame_definition确定,任何一个都不是必须的。

(1)partition_defintio 窗口分区

PARTITION BY expr [, expr] ...

根据表达式的计算结果来进行分区(列名也是一种表达式)。

(2)order_definition 窗口排序

ORDER BY expr [ASC|DESC] [, expr [ASC|DESC]] ...

为分区内的行的排列顺序。

(3)frame_definition 窗口框架

frame\_clause:
frame\_units frame\_extentframe\_units:
{ROWS | RANGE}frame\_extent:
{frame\_start | frame\_between}frame\_between:
BETWEEN frame\_start AND frame\_endframe\_start, frame\_end: {
CURRENT ROW
| UNBOUNDED PRECEDING
| UNBOUNDED FOLLOWING
| expr PRECEDING
| expr FOLLOWING}

解释:

  • PRECEDING:往前
  • FOLLOWING:往后
  • CURRENT ROW:当前行
  • UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点
  • UNBOUNDED FOLLOWING:表示到后面的终点

窗口框架的作用对分区进一步细分,frame_unit有两种,分别是ROWS和RANGE,ROWS通过指定当前行之前或之后的固定数目的行来限制分区中的行,RANGE按照排序列的当前值,根据相同值来确定分区中的行。

10. Hive内部表、外部表、分区表、分桶表的区别,以及各自的使用场景

  • 内部表

如果Hive中没有特别指定,则默认创建的表都是管理表,也称内部表。由Hive负责管理表中的数据,管理表不共享数据。删除管理表时,会删除管理表中的数据和元数据信息。

绝大多数表都是外部表; 只有自己使用的临时表,才是内部表。

  • 外部表

当一份数据需要被共享时,可以创建一个外部表指向这份数据。外部表数据由HDFS管理。删除该表并不会删除掉原始数据,删除的是表的元数据。当表结构或者分区数发生变化时,需要进行一步修复的操作。

场景

每天将收集到的网站日志定期流入 HDFS 文本文件。在外部表(原始日志表)的基础上做大量的统计分析,用到的中间表、结果表使用内部表存储,数据通过 SELECT+INSERT 进入内部表。

  • 分区表

分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。 Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。

分区表使用的是表外字段,需要指定字段类型,并通过关键字partitioned by(partition_name string)声名。

  • 分桶表

分桶使用的是表内字段,已经知道字段类型,不需要再指定。通过关键字 clustered by(column_name) into n buckets声明。分桶是更细粒度的划分、管理数据,可以对表进行先分区再分桶的划分策略

分桶最大的优势就是:用于数据取样,可以起到优化加速的作用。

*# 抽样查询*
select \* from stu\_buck tablesample(bucket 1 out of 4 on id);

分桶规则: Hive 的分桶采用对分桶字段的值进行哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。

经测试,如果设置reduce数量n>分桶表的通数m,测mapreduce会分出分m 和 (n-m)两部分 reduce task执行,其中(n-m)个reduce task应该执行的是空任务,m个reduce task执行分桶任务。如果n<m,则执行会报错。而当educe数量设置为-1后,分桶表分几个同,则有几个reduce。

11. Order By、Sort By、Distrbute By、Cluster By的区别

  • Order By(全局排序)

order by 会对输入做全局排序,因此只有一个reduce,也正因为只有一个 reducer,所以当输入的数据规模较大时,会导致计算的时间较长。(无论设置的reduce数量为多少,都只会有一个Reducer起作用,这样才能保证全局有序)

如果在HADOOP上进行order by全排序,会导致所有的数据集中在一台reducer节点上,然后进行排序,这样很可能会超过单个节点的磁盘和内存存储能力导致任务失败。

  • Sort By(分区的排序,即每个reducer有序)

Sort by 为每个 reducer 产生一个排序文件。每个 Reducer 内部进行排序,对全局结果集来说不是排序。

  • Distrbute By(控制进入分区)

在有些情况下,我们需要控制某个特定行应该到哪个 reducer ,通常是为了进行后续的聚集操作。distribute by 子句可以做这件事。distribute by类似 MR 中 partition(自定义分区),进行分区,结合 sort by 使用。

distribute by 的分区规则是根据分区字段的 hash 码与 reduce 的个数进行模除后,余数相同的分到一个区。

  • Cluster By

当 distribute by 和 sorts by字段相同时,可以使用 cluster by 方式代替。cluster by除了具有 distribute by 的功能外还兼具 sort by 的功能。但是排序只能是 升序 排序,不能像distribute by 一样去指定排序的规则为 ASC 或者 DESC

12. 动态分区和静态分区的区别及使用场景

关于动态分区在实际生产环境中的使用也是比较的多,所以这道题出现的频率也很高,但是不难。

  • 静态分区:

定义:对于静态分区,从字面就可以理解:表的分区数量和分区值是固定的。静态分区需要手动指定,列是在编译时期通过用户传递来决定的。

应用场景:需要提前知道所有分区。适用于分区定义得早且数量少的用例,不适用于生产。

  • 动态分区:

定义:是基于查询参数的位置去推断分区的名称,只有在 SQL 执行时才能确定,会根据数据自动的创建新的分区。

应用场景:有很多分区,无法提前预估新分区,动态分区是合适的,一般用于生产环境。

具体可参考Hive动态分区多种插入方式总结

13. Hive SQL语句的执行顺序

sql语句的执行顺序from-where-group by-having -select-order by -limit

(7)    SELECT

(8)    DISTINCT <select\_list>

(1)    FROM <left\_table>

(3)    <join\_type> JOIN <right\_table>

(2)    ON <join\_condition>

(4)    WHERE <where\_condition>

(5)    GROUP BY <group\_by\_list>

(6)    HAVING <having\_condition>

(9)    ORDER BY <order\_by\_condition>

(10)   LIMIT <limit\_number>

14. 请说明一下on和where的区别

on是在生成连接表的起作用的,where是生成连接表之后对连接表再进行过滤。

当使用left join时,无论on的条件是否满足,都会返回左表的所有记录,对于满足的条件的记录,两个表对应的记录会连接起来,对于不满足条件的记录,那右表字段全部是null。

当使用right join时,类似,只不过是全部返回右表的所有记录

当使用inner join时,功能与where完全相同

注意:where虽然也可用于两个表连接,但是如果这样的话,由于on没有连接条件,会产生笛卡尔积,因此一般将where用于on连接表之后再进行过滤。

另外,不得不提的是,将 hive.strict.checks.cartesian.product 设置为 true 时, 会限制笛卡尔积的查询。 对关系型数据库非常了解的用户可能期望在 执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将 WHERE 语句转化成那个 ON 语句。不幸的是, Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

15. 请你说明hql所有的优化方式

(1)小表大表 Join(MapJOIN)/相同的key过多/表连接时引发的数据倾斜

将 key 相对分散,并且数据量小的表放在 join 的左边,可以使用 map join 让小的维度表先进内存。在 map 端完成 join。

在Hive 0.11版本之前,如果想在Map阶段完成join操作,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小

如将a表放到Map端内存中执行,在Hive 0.11版本之前需要这样写:

select /\* +mapjoin(a) \*/ a.id , a.name, b.age 

from a join b 

on a.id = b.id;

如果想将多个表放到Map端内存中,只需在mapjoin()中写多个表名称即可,用逗号分隔,如将a表和c表放到Map端内存中,则 /* +mapjoin(a,c) */ 。

在Hive 0.11版本及之后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机:

hive.auto.convert.join=true 默认值为true,自动开启MAPJOIN优化。

hive.mapjoin.smalltable.filesize=2500000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中。

(2)大表 Join 大表/空值引发的数据倾斜

注意:空值处理时,在非inner join的时候用,在inner join时在自动进行空值过滤。

有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key。

异常数据时,空KEY过滤

很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为空,

insert overwrite table jointable select n.\* from (select 
\* from nullidtable where id is not null) n left join bigtable o on n.id = 
o.id;

非异常数据时,空key转换

有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。由于null 值关联不上,处理后并不影响最终结果。

set mapreduce.job.reduces = 5;
insert overwrite table jointable
select n.\* from nullidtable n full join bigtable o on 
nvl(n.id,rand()) = o.id;

(3)Group By
默认情况下, Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。
两个参数:

  • hive.map.aggr=true:在map中会做部分聚集操作,效率更高但需要更多的内存。
  • hive.groupby.skewindata=true:数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

由上面可以看出起到至关重要的作用的其实是第二个参数的设置,它使计算变成了两个mapreduce,先在第一个中在 shuffle 过程 partition 时随机给 key 打标记,使每个key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上,所以需要第二次的mapreduce,这次就回归正常 shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。

(4) Count(Distinct) 去重统计

数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换,但是需要注意 group by 造成的数据倾斜问题

(5)笛卡尔积

尽量避免笛卡尔积, join 的时候不加 on 条件,或者无效的 on 条件(比如1=1),比如产生大量数据

(6)行列过滤

列处理:在 SELECT 中,只拿需要的列,如果有分区,尽量使用分区过滤,少用SELECT *。

行处理:当join时,尽量使用谓词下推技术。

通过执行计划(explain)查看,下面两个hql的执行计划是一样的,其中,第一个hql用到了系统的谓词下推优化技术,即当join的连接字段和where后面的条件字段都是一个字段时,Hive在执行时会先根据where后面的过滤条件过滤两个表,然后再进行join。但是,当写的sql比较长时,谓词下推可能会失效。

先关联两张表,再用 where 条件过滤

hive (default)> select o.id from bigtable b
join bigtable o on o.id = b.id
where o.id <= 10;

通过子查询后,再关联表(如果join的连接字段和where后面的条件字段不是一个字段时,可以这样做)

select b.id from bigtable b
join (select id from bigtable where id <= 10) o on b.id = o.id;

(7)分区

(8)分桶

16. 解决hive小文件过多问题

参考:https://mp.weixin.qq.com/s?\_\_biz=Mzg2MzU2MDYzOA==&mid=2247483683&idx=1&sn=14b25010032bdf0d375080e48de36d7f&scene=21#wechat\_redirect

(1) 小文件产生原因

hive 中的小文件肯定是向 hive 表中导入数据时产生,所以先看下向 hive 中导入数据的几种方式

  1. 直接向表中插入数据

    insert into table A values (1,'zhangsan',88),(2,'lisi',61);

    这种方式每次插入时都会产生一个文件,多次插入少量数据就会出现多个小文件,但是这种方式生产环境很少使用,可以说基本没有使用的

  2. 通过load方式加载数据

    load data local inpath '/export/score.csv' overwrite into table A  *-- 导入文件*
    load data local inpath '/export/score' overwrite into table A   *-- 导入文件夹*

    使用 load 方式可以导入文件或文件夹,当导入一个文件时,hive表就有一个文件,当导入文件夹时,hive表的文件数量为文件夹下所有文件的数量

  3. 通过查询方式加载数据

    insert overwrite table A  select s\_id,c\_name,s\_score from B;

    这种方式是生产环境中常用的,也是最容易产生小文件的方式。
    insert 导入数据时会启动 MR 任务,MR中 reduce 有多少个就输出多少个文件。
    所以, 文件数量=ReduceTask数量*分区数

也有很多简单任务没有reduce,只有map阶段,则文件数量=MapTask数量*分区数

每执行一次 insert 时hive中至少产生一个文件,因为 insert 导入时至少会有一个MapTask。
像有的业务需要每10分钟就要把数据同步到 hive 中,这样产生的文件就会很多。

(2)小文件过多产生的影响

HDFS 上每个文件都要在 NameNode 上创建对应的元数据,这个元数据的大小约为150byte,这样当小文件比较多的时候,就会产生很多的元数据文件,一方面会大量占用NameNode 的内存空间,另一方面就是元数据文件过多,使得寻址索引速度变慢。

小文件过多,在进行 MR 计算时,会生成过多切片,需要启动过多的 MapTask。每个MapTask 处理的数据量小,导致 MapTask 的处理时间比启动时间还小,白白消耗资源

(3)解决方法

(1)使用 hive 自带的 concatenate 命令,自动合并小文件

使用方法:

*#对于非分区表*

alter table A concatenate;

*#对于分区表*

alter table B partition(day=20201224) concatenate;

(2)在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS(数据源头)

(3)Hadoop Archive(存储方向)

通过HDFS的har归档文件进行归档,它将HDFS中一个个小文件归档成一个文件,对 NameNode 是一个整体,但是其内部实际上还是许多个小文件,减少了 NameNode 的内存。 具体看49题。

\# 下面是Hive中的相关参数

#用来控制归档是否可用

set hive.archive.enabled=true;

#通知Hive在创建归档时是否可以设置父目录

set hive.archive.har.parentdir.settable=true;

#控制需要归档文件的大小

set har.partfile.size=1099511627776;

#使用以下命令进行归档

ALTER TABLE A ARCHIVE PARTITION(dt='2020-12-24', hr='12');

#对已归档的分区恢复为原文件

ALTER TABLE A UNARCHIVE PARTITION(dt='2020-12-24', hr='12');

(4)CombineTextInputFormat(计算方向)

CombineTextInputFormat 用于将多个小文件在切片过程中生成一个单独的切片或者少量的切片,以减少切片的数量。

#执行Map前进行小文件合并

#CombineHiveInputFormat底层是 Hadoop的 CombineFileInputFormat 方法

#此方法是在mapper中将多个文件合成一个split作为输入

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 默认

#每个Map最大输入大小(这个值决定了合并后文件的数量)

set mapred.max.split.size=256000000;   -- 256M

#一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)

set mapred.min.split.size.per.node=100000000;  -- 100M

#一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)

set mapred.min.split.size.per.rack=100000000;  -- 100M

(5)开启 uber 模式,实现 JVM 重用(计算方向)

默认情况下,每个 Task 任务都需要启动一个 JVM 来运行,如果 Task 任务计算的数据量很小,我们可以让同一个 Job 的多个 Task 运行在一个 JVM 中,不必为每个 Task 都开启一个 JVM。

一个任务的JVM重用会一直开启,直到这个任务结束才会关闭重用的JVM

这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

(6)使用Sequence file

sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。 和 HAR 不同的是,这种方式还支持压缩。该方案对于小文件的存取都比较自由,不限制文件的多少,但是 SequenceFile 文件不能追加写入,适用于一次性写入大量小文件的操作。

SequenceFile 仅仅能解决小文件问题,但这种数据格式的执行时间挺慢的(具体看16题)

17. Hive数据倾斜问题

参考:https://mp.weixin.qq.com/s?\_\_biz=Mzg2MzU2MDYzOA==&mid=2247485154&idx=1&sn=cd7129544497c1a621e49dbc1d7ed5c3&scene=21#wechat\_redirect

MapReduce和Spark中的数据倾斜解决方案原理都是类似的,以下讨论Hive使用MapReduce引擎引发的数据倾斜,Spark数据倾斜也可以此为参照。

(1)表连接时引发的数据倾斜

排除空值后,如果表连接的键存在倾斜,那么在 Reduce阶段必然会引起数据倾斜。

解决方案

通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在Map阶段完成join操作,即MapJoin,从而减少了Reduce数据倾斜。

在Hive 0.11版本之前,如果想在Map阶段完成join操作,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小

如将a表放到Map端内存中执行,在Hive 0.11版本之前需要这样写:

select /\* +mapjoin(a) \*/ a.id , a.name, b.age 

from a join b 

on a.id = b.id;

如果想将多个表放到Map端内存中,只需在mapjoin()中写多个表名称即可,用逗号分隔,如将a表和c表放到Map端内存中,则 /* +mapjoin(a,c) */ 。

在Hive 0.11版本及之后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机:

hive.auto.convert.join=true 默认值为true,自动开启MAPJOIN优化。

hive.mapjoin.smalltable.filesize=2500000 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中。

(2)空值引发的数据倾斜

实际业务中有些大量的null值或者一些无意义的数据参与到计算作业中,表中有大量的null值,如果表之间进行join操作,这样所有的null值都会被分配到一个reduce中,必然产生数据倾斜。

解决方案

方法一:异常数据时,空KEY过滤

很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为空,

insert overwrite table jointable select n.\* from (select 

\* from nullidtable where id is not null) n left join bigtable o on n.id = 

o.id;

方法二:非异常数据时,空key转换

有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join 的结果中,此时我们可以表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。由于null 值关联不上,处理后并不影响最终结果。

set mapreduce.job.reduces = 5;
insert overwrite table jointable
select n.\* from nullidtable n full join bigtable o on 
nvl(n.id,rand()) = o.id;

(3)Group By 引发的数据倾斜

如果group by 维度过小, Map 阶段同一 Key 有大量的数据分发给一个 reduce,很容易发生倾斜了。

两个参数:

  • hive.map.aggr=true:在map中会做部分聚集操作,效率更高但需要更多的内存。
  • hive.groupby.skewindata=true:数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

由上面可以看出起到至关重要的作用的其实是第二个参数的设置,它使计算变成了两个mapreduce,先在第一个中在 shuffle 过程 partition 时随机给 key 打标记,使每个key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上,所以需要第二次的mapreduce,这次就回归正常 shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善,因此基本解决数据倾斜。

当然,也可以自己通过mapreduce程序来实现,即在 map 阶段将造成倾斜的key 先分成多组,例如 aaa 这个 key,map 时随机在 aaa 后面加上 1,2,3,4 这四个数字之一,把 key 先分成四组,先进行一次运算,之后再恢复 key 进行最终运算。

(4) Count(Distinct) 引发的数据倾斜

数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换,但是需要注意 group by 造成的数据倾斜问题

(5)不可拆分大文件引发的数据倾斜

当集群的数据量增长到一定规模,有些数据需要归档或者转储,这时候往往会对数据进行压缩;当对文件使用GZIP压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。

解决方案:

这种数据倾斜问题没有什么好的解决方案,只能将使用GZIP压缩等不支持文件分割的文件转为bzip2等支持文件分割的压缩方式。

所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法

18. 请介绍一下Hive的严格模式

Hive的严格模式可以通过设置防止一些危险操作:

(1)分区表不使用分区过滤

将 hive.strict.checks.no.partition.filter 设置为 true 时,对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围, 否则不允许执行。 换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。

(2)使用 order by 没有 limit 过滤

将 hive.strict.checks.orderby.no.limit 设置为 true 时,对于使用了 order by 语句的查询,要求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个Reducer 中进行处理(全局排序),强制要求用户增加这个 LIMIT 语句可以防止 Reducer 额外执行很长一段时间。

如果order by 后面用limit 2,相当于每个map只用两条数据,当数据多于两条时,将最小(或最大)哪一个扔掉,这样就减轻了reducer端执行负担。

(3)笛卡尔积

将 hive.strict.checks.cartesian.product 设置为 true 时, 会限制笛卡尔积的查询。 对关系型数据库非常了解的用户可能期望在 执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将 WHERE 语句转化成那个 ON 语句。不幸的是, Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

19. Hive索引有了解过吗

Hive支持索引(3.0版本之前),但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。并且Hive索引提供的功能很有限,效率也并不高,因此Hive索引很少使用。

  • 索引适用的场景:

适用于不更新的静态字段。以免总是重建索引数据。每次建立、更新数据后,都要重建索引以构建索引表。

  • Hive索引的机制如下:

hive在指定列上建立索引,会产生一张索引表(Hive的一张物理表),里面的字段包括:索引列的值、该值对应的HDFS文件路径、该值在文件中的偏移量。

Hive 0.8版本后引入bitmap索引处理器,这个处理器适用于去重后,值较少的列(例如,某字段的取值只可能是几个枚举值) 因为索引是用空间换时间,索引列的取值过多会导致建立bitmap索引表过大。

注意:Hive中每次有数据时需要及时更新索引,相当于重建一个新表,否则会影响数据查询的效率和准确性,Hive官方文档已经明确表示Hive的索引不推荐被使用,在新版本的Hive中已经被废弃了

扩展:Hive是在0.7版本之后支持索引的,在0.8版本后引入bitmap索引处理器,在3.0版本开始移除索引的功能,取而代之的是2.3版本开始的物化视图,自动重写的物化视图替代了索引的功能。

20. Hive有哪些方式保存元数据,各有哪些特点

Hive支持三种不同的元存储服务器,分别为:内嵌模式、本地模式、远程模式

(1)、内嵌模式:将元数据保存在本地内嵌的derby数据库中,内嵌的derby数据库不支持多客户端访问

(2)、本地模式:将元数据保存在本地独立的数据库中(一般是mysql),这可以支持多会话连接。

(3)、远程模式:把元数据保存在远程独立的mysql数据库中,避免每个客户端都去安装mysql数据库。

三种配置方式区别

  • 内嵌模式使用的是内嵌的Derby数据库来存储元数据,也不需要额外起Metastore服务。这个是默认的,配置简单,但是一次只能一个客户端连接,适用于用来实验,不适用于生产环境。
  • 本地元存储和远程元存储都采用外部数据库来存储元数据,目前支持的数据库有:MySQL、Postgres、Oracle、MS SQL Server。在这里我们使用MySQL。
  • 本地元存储和远程元存储的区别是:本地元存储不需要单独启动metastore服务,用的是跟hive在同一个进程里的metastore服务远程元存储需要单独启动metastore服务,然后每个客户端都在配置文件里配置连接到该metastore服务。远程元存储的metastore服务和hive运行在不同的进程。

metastore服务主要是用来连接mysql的

各有什么特点:

1.内存数据库derby,安装小,但是数据存在内存,不稳定

2.mysql数据库,数据存储模式可以自己设置,持久化好,查看方便。

21. 列式存储和行式存储

如图所示左边为逻辑表,右边第一个为行式存储,第二个为列式存储。

行存储的特点
查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。

列存储的特点

因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法

TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的;
ORC 和 PARQUET 是基于列式存储的。

22. Hive 中的压缩格式TextFile、SequenceFile、RCfile 、ORCfile、Parquet各有什么区别

(1)TextFile

默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大,可结合gzip、lzo、snappy、bzip2使用(系统自动检查,执行查询时自动解压),但是使用gzip、snappy,不会对数据进行切分,从而无法对数据进行并行操作。

TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的。

(2)SequenceFile

Hadoop 的 HDFS 和 MapReduce 子框架主要是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源(每一个小文件占用一个 Block,每一个 block 的元数据都存储在 namenode 的内存里)。解决办法通常是选择一个容器,将这些小文件组织起来统一存储。HDFS 提供了两种类型的容器,分别是 SequenceFileMapFile

MapFile 是排序后的 SequenceFile

SeqeunceFile支持两种格式的数据压缩,分别是:record compressionblock compression

record compression是对每条记录的value进行压缩

block compression是将一连串的record组织到一起,统一压缩成一个block

一般建议使用BLOCK压缩

TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的。

(3)RCFile
存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:
首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低;
其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;

(4)ORCFile
存储方式:数据按行分块,每块按照列存储。
它是rcfile的改良版本, 效率比rcfile高, 压缩快、快速列存取。

TEXTFILE 和 SEQUENCEFILE 的存储格式都是基于行存储的,RCFile和ORCFile每块按列存储。

每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe一般为 HDFS的块大小,每一个 stripe 包含多条记录,这些记录按照列进行独立存储,对应到 Parquet中的 row group 的概念。每个 Stripe 里有三部分组成,分别是 Index Data,Row Data,Stripe Footer

  • Index Data:一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引应该只是记录某行的各字段在 Row Data 中的 offset。
  • Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
  • Stripe Footer:存的是各个 Stream 的类型,长度等信息。

(5)Parquet

Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。

每个 Parquet 文件由 1 个或多个 Row Group组成。

  • 行组(Row Group):每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一个行组,类似于 orc 的 stripe 的概念。
  • 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
  • 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。

我们执行同样的SQL语句及同样的数据,只是数据存储格式不同,得到如下执行时长:

数据格式 CPU时间 用户等待耗时
TextFile 33分 171秒
SequenceFile 38分 162秒
Parquet 2分22秒 50秒
ORC 1分52秒 56秒

注:CPU时间:表示运行程序所占用服务器CPU资源的时间。
用户等待耗时:记录的是用户从提交作业到返回结果期间用户等待的所有时间。

查询TextFile类型的数据表耗时33分钟, 查询ORC类型的表耗时1分52秒,时间得以极大缩短,可见不同的数据存储格式也能给HiveSQL性能带来极大的影响。

23. Hive的UDF、UDAF、UDTF函数有什么区别

当 Hive 提供的内置函数无法满足你的业务处理需要时, 此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

(1)UDF(User-Defined-Function)

单行进入,单行输出

(2)UDAF(User-Defined Aggregation Function)

聚集函数,多行进入,单行输出

(3)UDTF(User-Defined Table-Generating Functions)

一进多出

自定义函数的步骤:

 继承 Hive 提供的类
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
实现类中的抽象方法,主要是evaluate方法,用于实现业务逻辑
在 hive 的命令行窗口创建函数
*# 添加jar包*
add jar /opt/module/data/myudf.jar;
*# 创建临时函数与开发好的 java class 关联*
create temporary function my\_len as "com.layne.hive.MyStringLength";

24. Hive默认的分隔符是什么

把hive表格导出到本地时,系统默认的分隔符是^A即Ctrl+A,这个是特殊字符,直接cat或者vim是看不到的,用八进制编码\001表示。

25. hive中导入数据的几种方式

(1)向表中装载数据(Load)

加载本地文件到 hive

load data local inpath '/opt/module/hive/datas/student.txt' into table default.student;

加载 HDFS 文件到 hive 中

load data inpath '/user/layne/hive/student.txt' into table default.student;

加载数据覆盖表中已有的数据

load data inpath '/user/layne/hive/student.txt' overwrite overwrite table default.student;

(2)通过查询语句向表中插入数据(Insert)

基本插入数据

insert into table student\_par values(1,'wangwu'),(2,'zhaoliu');

基本模式插入(根据单张表查询结果)

insert overwrite table student\_par select id, name from student where month='201709';

insert into:以追加数据的方式插入到表或分区,原有数据不会删除
insert overwrite:会覆盖表中已存在的数据
注意:insert 不支持插入部分字段

多表(多分区)插入模式(根据多张表查询结果)

from student

insert overwrite table student partition(month='201707')

select id, name where month='201709'

insert overwrite table student partition(month='201706')

select id, name where month='201709'; 

(3)查询语句中创建表并加载数据(As Select)

根据查询结果创建表(查询的结果会添加到新创建的表中)

create table if not exists student3

as select id, name from student;

(4)创建表时通过 Location 指定加载数据路径

创建表,并指定在 hdfs 上的位置

create external table if not exists student5(

id int, name string

)

row format delimited fields terminated by '\t'

location '/student;

(5)Import 数据到指定 Hive 表中

一般配合export使用,做数据迁移时,先用 export 导出到HDFS后,再将数据导入

import table student2   

from '/user/hive/warehouse/export/student';

26. hive导出数据的几种方式

(1)Insert 导出

将查询的结果导出到本地(默认分隔符)

insert overwrite local directory '/opt/module/hive/data/export/student' select \* from student;

将查询的结果格式化导出到本地(指定分割符)

insert overwrite local directory '/opt/module/hive/data/export/student1'

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

select \* from student;

将查询的结果导出到 HDFS 上(没有 local)

insert overwrite directory '/user/layne/student2' 

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' select \* from student;

(2)Hadoop 命令导出到本地

hive (default)> dfs -get /user/hive/warehouse/student/student.txt

/opt/module/data/export/student3.txt; 

(3)Hive Shell 命令导出

基本语法:(hive -f/-e 执行语句或者脚本 > file)

[layne@hadoop102 hive]$ bin/hive -e 'select \* from default.student;' >

/opt/module/hive/data/export/student4.txt;

(4)Export 导出到 HDFS 上

export table default.student to '/user/hive/warehouse/export/student';

export 和 import 主要用于两个 Hadoop 平台集群之间 Hive 表迁移。

(5)Sqoop 导出

27. hive的执行计划有看过吗,你一般会关注哪几个点

参考:https://mp.weixin.qq.com/s?\_\_biz=Mzg2MzU2MDYzOA==&mid=2247484152&idx=1&sn=7e48aa4a9650481f960c6cac234977a4&chksm=ce77f429f9007d3f9cd813f5576c2e751716d903af42d7d6b598edb4aa3045e0967de7e1d5c9&scene=178&cur\_album\_id=1688731294790139905#rd

先看一遍上面的文章,下面是我摘录的重点

HIVE提供了EXPLAIN命令来展示一个查询的执行计划,这个执行计划对于我们了解底层原理

  • EXTENDED:加上 extended 可以输出有关计划的额外信息。这通常是物理信息,例如文件名。这些额外信息对我们用处不大
  • DEPENDENCY:dependency在EXPLAIN语句中使用会产生有关计划中输入的额外信息。它显示了输入的各种属性

我重点关注,分为几个阶段和各个阶段的依赖

  1. stage dependencies: 各个stage之间的依赖性
  2. stage plan: 各个stage的执行计划

重点看,

  • Filter Operator下的predicate过滤条件
  • Map Join Operator的condition map(即join方式)
  • Select Operator下的Statistics统计信息(包含表中数据条数,数据大小)【输入的时候】
  • Group By Operator下的Statistics统计信息(包含分组聚合之后的数据条数,数据大小等)
  • File Output Operator下的Statistics统计信息(文件输出的信息)

统计信息重点看数据量的多少,一个Task是否会发生数据倾斜。这样就能判断任务的内存或单个task的内存是否充足。

28. Hive的两张表关联,使用MapReduce怎么实现

看hadoop的44题

1)reduce join : 在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了区分两种来源的 key/value 数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件 File1,tag=2 表示来自文件 File2。 在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进行合并就 ok 了。

2)map join : Map side join 是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们就可以采用 DistributedCache的方法,具体看上一题(hadoop的43题):Hadoop 的缓存机制(Distributedcache)

29. 写出hive中split、coalesce及collect_list函数的用法

split将字符串转化为数组,即:split(‘a,b,c,d’ , ‘,’) ==> [“a”,“b”,“c”,“d”]。

coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL。

nvl只有两个参数,功能和coalesce一样。

collect_list列出该字段所有的值,不去重 => select collect_list(id) from table。

30. 使用过Hive解析JSON串吗

参考:https://mp.weixin.qq.com/s?\_\_biz=Mzg2MzU2MDYzOA==&mid=2247485175&idx=1&sn=63f58dc2946678d4e50eb6e7bb3ff745&chksm=ce77f026f900793026c2ef23a466a59160c8f20230c01621e9eecad8911a5db9140ddf54354a&scene=178&cur\_album\_id=1800525691487076353#rd

先看一遍上面那个。

下面是我的总结

解析一个josn字符串,可以用:

  1. get_json_object(json_string, ‘$.key’) 一次只能解析json中的一个字段
  2. json_tuple(json_string, k1, k2 …) 一次可以解析json中的多个字段

如果要解析一个json数组,可以:

  1. 通过regexp_replace、split、explode将json数组转化为多个json字符串
  2. 使用子查询的方式,结合json_tuple或get_json_object函数来解析json里面的字段

例如:

select json\_tuple(json, 'website', 'name') 
from (
select explode(split(regexp\_replace(regexp\_replace('[{"website":"baidu.com","name":"百度"},{"website":"google.com","name":"谷歌"}]', '\\[|\\]',''),'\\}\\,\\{','\\}\\;\\{'),'\\;')) 
as json) t;

select get\_json\_object(json, '$.website'),get\_json\_object(json, '$.name')
from (
select explode(split(regexp\_replace(regexp\_replace('[{"website":"baidu.com","name":"百度"},{"website":"google.com","name":"谷歌"}]', '\\[|\\]',''),'\\}\\,\\{','\\}\\;\\{'),'\\;'))
as json) t;

注意,不能json_tuple(explode(split(regexp_replace,因为explode是UDTF,UDTF函数不能写在别的函数内,也就是这里的explode函数不能写在json_tuple里面。所以要结合子查询方式

另外,也可以结合LATERAL VIEW解析具有对应关系的json数组,如下:

hive表中 goods_id 和 json_str 字段的内容如下:

goods_id json_str
1,2,3 [{“source”:“7fresh”,“monthSales”:4900,“userCount”:1900,“score”:“9.9”},{“source”:“jd”,“monthSales”:2090,“userCount”:78981,“score”:“9.8”},{“source”:“jdmart”,“monthSales”:6987,“userCount”:1600,“score”:“9.0”}]

31. 请说出Hive运行的三种引擎

Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。

Mr/tez/spark区别:

Mr引擎:多job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。

Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。

Tez引擎:完全基于内存。 注意:如果数据量特别大,慎重使用。容易OOM。一般用于快速出结果,数据量比较小的场景。

32.基本概念

基本概念一般会以问答题的方式进行考察,比如在面试的时候直接问:说说你对Hive的理解?Hive的作用有哪些?这种类似的问题

1)说说你对Hive的理解

​ 从概念上讲,Hive是一款开源的基于hadoop的用于统计海量结构化数据的一个​​数据仓库​​,它定义了简单的类似SQL的查询语言,称为HQL,允许熟悉SQL的用户查询数据。

从本质上讲:Hive是将HQL语句转换成MapReduce程序的一个工具。

2)什么是数据仓库

数据仓库的概念是在20世纪90年代被提出来,初衷是专门为业务分析建立一个数据中心,解决因为数据太多查询效率低下的问题。一个被广泛接受的定义是:数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化(Time Variant)的数据集合,用于支持管理决策(Decision Making Support)。

3)简单说说MapReduce

MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在大规模集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。MapReduce的思想就是“分而治之”,Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理;Reducer负责对map阶段的结果进行汇总。

4)Hive的作用有哪些

可以将结构化的数据文件映射成一张表,并提供类SQL查询功能,方便非java开发人员对hdfs上的数据做 MapReduce 操作;

可以对数据提取转化加载(ETL)

构建数据仓

5)Hive的使用场景

即席查询:利用CLI或者类似Hue之类的工具,可以对Hive中的数据做即席查询,如果底层的引擎使用的是MapReduce耗时会很久,可以替换成Tez或者Spark;

离线的数据分析:通过执行定时调度或者脚本去执行HQL语句,并将结果保存;

构建数仓时用于组织管理数据库和表。

33.架构

架构这一块主要考察Hive的基本组成,也可以针对具体的部分进行进一步考察。

1)Hive的构成包括哪些部分?

1.用户接口层:常用的三个分别是CLI,JDBC/ODBC 和 WUI。其中最常用的是CLI,CLI启动的时候,会同时启动一个Hive副本。JDBC/ODBC是Hive的客户端,用户通过客户端连接至Hive Server。在启动客户端模式的时候,需要指出Hive Server所在节点,并且在该节点启动Hive Server。WUI是通过浏览器访问Hive。

2.元数据存储:Hive将元数据存储在RDBMS中,有三种模式可以连接到数据库,分别是内嵌式元存储服务器、本地元存储服务器、远程元存储服务器。

3.Driver(Compiler/Optimizer/Executor)

Driver完成HQL查询语句的词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS上,并由MapReduce调用执行。

2)Hive有哪些方式保存元数据,各有什么特点?

Hive支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器,每种存储方式使用不同的配置参数。

内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby是内嵌式元存储的默认数据库。

在本地模式下,每个Hive客户端都会打开到数据存储的连接并在该连接上请求SQL查询。

在远程模式下,所有的Hive客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用Thrift协议通信。

3)Hive QL语句是怎么执行的?

整个过程的执行步骤如下:

(1) 解释器完成词法、语法和语义的分析以及中间代码生成,最终转换成抽象语法树;

(2) 编译器将语法树编译为逻辑执行计划;

(3) 逻辑层优化器对逻辑执行计划进行优化,由于Hive最终生成的MapReduce任务中,Map阶段和Reduce阶段均由OperatorTree组成,所以大部分逻辑层优化器通过变换OperatorTree,合并操作符,达到减少MapReduce Job和减少shuffle数据量的目的;

(4) 物理层优化器进行MapReduce任务的变换,生成最终的物理执行计划;

(5) 执行器调用底层的运行框架执行最终的物理执行计划。

34.数据组织

数据组织主要考察面试者对Hive的数据库、表、视图、分区和表数据的概念的考察,清楚的说出每个概念的含义就可以了。

1)Hive的存储结构包括哪些?可以具体说说每种结构吗?

包括数据库、表、分区、桶、视图和表数据。

database-数据库在 HDFS 中表现为指定的目录下的一个文件夹,通过${hive.metastore.warehouse.dir}可以进行设置;

table-内部表在 HDFS 中表现为某个 database 目录下一个文件夹,默认创建的都是内部表;

external table-外部表与内部表类似,在 HDFS 中表现为指定目录下一个文件夹;

bucket-桶在 HDFS 中表现为同一个表目录或者分区目录下根据某个字段的值进行 hash 散列之后的多个文件;

view-视图与表类似,只读,基于基本表创建,不占存储空间,实际是一连串的查询语句;

表数据对应 HDFS 对应目录下的文件。

2)内部表和外部表的区别吗?

内部表数据由Hive自身管理,外部表数据由HDFS管理;删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除。

3)说说分区表和分桶表的区别?

在Hive中,分区表(Partitioned Table)和分桶表(Bucketed Table)是用于优化数据查询和管理的两种不同的表结构。

分区表:

分区表是根据一个或多个列的值将数据划分为不同的分区。每个分区都是一个独立的子目录,其中包含特定分区的数据文件。
分区表的分区可以基于时间、地区、类别或其他相关字段。这样的分区设计可以提高查询性能,因为只需扫描特定分区的数据而不是整个表。
分区表的分区信息存储在Hive的元数据中,可以通过分区字段进行过滤和查询。
分桶表:

分桶表是将数据划分为固定数量的桶(Bucket),每个桶中存放一部分数据。分桶是通过对数据的哈希函数计算得到的,可以基于一个或多个列进行哈希计算。
分桶表的桶数通常是在创建表时指定的,桶的数量应该根据数据的大小和查询需求进行合理的选择。
分桶表可以在数据分布均匀的情况下提高查询性能,因为查询时只需扫描特定桶的数据而不是整个表。
区别:

分区表是按照列值划分数据,而分桶表是通过哈希函数计算对数据进行划分。
分区表的分区信息存储在Hive的元数据中,而分桶表的桶信息存储在数据文件的元数据中。
分区表的分区可以动态添加或删除,而分桶表的桶数在创建表时指定且不可更改。
分区表适用于按照特定列进行数据过滤和查询,而分桶表适用于对数据进行随机访问和均匀分布的查询。
综上所述,分区表和分桶表是用于不同场景下的数据管理和查询优化。选择使用哪种表结构取决于数据的特点、查询需求和性能优化的目标。

35.数据倾斜

数据倾斜不仅在Hive面试中会被问到,其他只要涉及到大规模程序开发的组件都会问到数据倾斜方面的问题,因为这是在实际工作中经常会出现的问题,如何去避免和解决出现的数据倾斜问题是衡量你代码水平高低的尺子。

1)什么是数据倾斜?

数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据倾斜。

2)你知道发生数据倾斜的原因吗?

发生数据倾斜的原因有很多,大致可以归为:

1)key分布不均匀;

2)数据本身的特性,原本按照日期进行分区,如果在特定日期数据量剧增,就有可能造成倾斜;

3)建表时考虑不周,分区设置不合理或者过少;

4)某些 HQL 语句本身就容易产生数据倾斜,如 join。

3)哪些HQL操作可能会发生数据倾斜?

对照上面的表格,可以得出有三种情况可能会发生数据倾斜:

1)join

大小表join的时候,其中一个较小表的key集中,这样分发到某一个或者几个的Reduce上的数据就可能远高于平均值;

两张大表join的时候,如果有很多0值和空值,那么这些0值或者空值就会分到一个Reduce上进行处理;

join的时候,不同数据类型进行关联,发生类型转换的时候可能会产生null值,null值也会被分到一个Reduce上进行处理;

2)group by

进行分组的字段的值太少,造成Reduce的数量少,相应的每个Reduce的压力就大;

3)count distinct

count distinct的时候相同的值会分配到同一个Reduce上,如果存在特殊的值太多也会造成数据倾斜。

36.HIVE优化

1)谈谈如何对join操作进行优化?

join优化是个复杂的问题,可以从以下几点进行优化:

1)小表前置

大小表在join的时候,应该将小表放在前面,Hive在解析带join的SQL语句时,会默认将最后一个表作为大表,将前面的表作为小表并试图将它们读进内存。如果表顺序写反,大表在前面,可能会引发OOM。

2)key值相同

多表join的时候尽量使用相同的key来关联,这样会将会将多个join合并为一个MR job来处理。

3)利用map join特性

map join特别适合大小表join的情况。Hive会将大表和小表在map端直接完成join过程,消灭reduce,效率很高。Hive 0.8版本之前,需要加上map join的暗示,以显式启用map join特性,具体做法是在select语句后面增加/+mapjoin(需要广播的较小表)/。

map join的配置项是hive.auto.convert.join,默认值true;还可以控制map join启用的条件,hive.mapjoin.smalltable.filesize,当较小表大小小于该值就会启用map join,默认值25MB。

2)对于空值或者无意义的值引发的数据倾斜,该怎么处理呢?

这在写程序的时候要考虑清楚,这些异常值的过滤会不会影响计算结果,如果影响那就不能直接过滤掉,可以将这些异常的key用随机方式打散,例如将用户ID为null的记录随机改为负值。

3)如何调整mapper数?

mapper数量与输入文件的split数息息相关,可以通过设置相关参数来调整mapper数。

1)可以直接通过参数mapred.map.tasks(默认值2)来设定mapper数的期望值,但它不一定是最终mapper数;

2)输入文件的总大小为total_input_size。HDFS中,一个块的大小由参数dfs.block.size指定,默认值64MB或128MB。所以得出来的默认mapper数就是:

default_mapper_num = total_input_size / dfs.block.size,但是它也不一定是最终的mapper数;

3)设置参数mapred.min.split.size(默认值1B)和mapred.max.split.size(默认值64MB)分别用来指定split的最小和最大值。那么split大小和split数计算规则是:

split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size));

split_num = total_input_size / split_size。

4)最终得出mapper数:

mapper_num = MIN(split_num, MAX(default_mapper_num, mapred.map.tasks))。

其中可变的参数有:mapred.map.tasks、dfs.block.size(不会为了一个程序去修改,但是也算是一个可变参数)、mapred.min.split.size、mapred.max.split.size,通过调整他们来实现,mapper数的变化。

4)如何调整reducer数?

利用参数mapred.reduce.tasks可以直接设定reducer数量,不像mapper一样是期望值。如果不设这个参数的话,Hive就会自行推测,逻辑如下:

1)参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量。

2)参数hive.exec.reducers.max用来设定每个job的最大reducer数量。

3)reducer数:

reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。

reducer数量决定了输出文件的数量。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢执行时间也有可能造成OOM。

5)什么时候又需要合并文件?如何合并小文件?

当有很多小文件的时候没需要合并小文件,可以在输入阶段合并,也可以在输出阶段合并。

1)输入阶段合并

要想文件自动合并,需要更改Hive的输入文件格式,通过参数hive.input.format来更改,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,需要改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。还需要设置mapred.min.split.size.per.node和mapred.min.split.size.per.rack这两个参数,他们的含义是单节点和单机架上的最小split大小。设置完后,如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。

2)输出阶段合并

设置hive.merge.mapfiles为true可以将map-only任务的输出合并;

设置hive.merge.mapredfiles为true可以将map-reduce任务的输出合并。另外,设置hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,一旦低于这个阈值,就会启动一个任务来进行合并。

37.Hive 表关联查询,如何解决数据倾斜的问题?

1.倾斜原因:map 输出数据按 key Hash 的分配到 reduce 中,由于 key 分布不均匀、业务数据本身的特、建表时考虑不周、等原因造成的 reduce 上的数据量差异过大。

1)key 分布不均匀;

2)业务数据本身的特性;

3)建表时考虑不周;

4)某些 SQL 语句本身就有数据倾斜;

如何避免:对于 key 为空产生的数据倾斜,可以对其赋予一个随机值。

2.解决方案

1)参数调节:

hive.map.aggr = true;

hive.groupby.skewindata=true;

有数据倾斜的时候进行负载均衡,当选项设定位 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的,Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

2)SQL 语句调节:

① 选用 join key 分布最均匀的表作为驱动表。做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果。

② 大小表 Join:使用 map join 让小的维度表(1000 条以下的记录条数)先进内存。在map 端完成 reduce。

③ 大表 Join 大表:把空值的 key 变成一个字符串加上随机数,把倾斜的数据分到不同的reduce 上,由于 null 值关联不上,处理后并不影响最终结果。④ count distinct 大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果是计算 count distinct,可以不用处理,直接过滤,在最后结果中加 1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

38.Hive 的 HSQL 转换为 MapReduce 的过程?

HiveSQL ->AST(抽象语法树) -> QB(查询块) ->OperatorTree(操作树)->优化后的操作树->mapreduce 任务树->优化后的 mapreduce 任务树

过程描述如下:

SQL Parser:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将SQL 转化为抽象语法树 AST Tree;

Semantic Analyzer:遍历 AST Tree,抽象出查询的基本组成单元QueryBlock;

Logical plan:遍历 QueryBlock,翻译为执行操作树 OperatorTree;

Logical plan optimizer: 逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle数据量;

Physical plan:遍历 OperatorTree,翻译为 MapReduce 任务;

Logical plan optimizer:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

详细中文描述:
在Hive中,将Hive SQL(HSQL)语句转换为MapReduce的过程如下:

解析:Hive将接收到的HSQL语句进行解析,识别出语句的类型(例如SELECT、INSERT、JOIN等)以及相关的关键字、表名、列名等信息。

逻辑优化:在逻辑优化阶段,Hive会对HSQL语句进行优化,包括查询重写、谓词下推、列剪裁等操作。这些优化旨在提高查询性能和减少数据的传输量。

逻辑计划生成:在生成逻辑计划阶段,Hive将优化后的HSQL语句转换为逻辑查询计划。逻辑计划是一个抽象的查询计划,描述了查询的逻辑流程和操作顺序。

物理优化:在物理优化阶段,Hive会对逻辑计划进行物理优化,选择合适的数据访问路径、操作顺序和算法等。这些优化旨在在MapReduce执行时提高查询性能。

MapReduce任务生成:一旦物理优化完成,Hive会将优化后的逻辑计划转换为一系列的MapReduce任务。每个任务负责处理一部分数据,并生成部分结果。

数据读取和处理:在执行阶段,Hive将读取输入数据并对其进行处理。数据可能会从HDFS或其他存储引擎中读取,并经过映射、过滤、聚合等操作。

MapReduce任务执行:Hive将生成的MapReduce任务提交给Hadoop集群进行执行。这些任务将在集群的计算节点上并行执行,以处理数据和生成最终结果。

结果输出:最后,Hive将处理后的结果写入指定的输出位置,可以是HDFS、数据库表或其他存储系统。

总体而言,HSQL语句的转换为MapReduce涉及解析、优化、计划生成、任务生成、数据处理和结果输出等阶段。这些阶段的具体实现取决于Hive的版本和配置。

在Hive中,以下是对应的概念解释:

SQL Parser:SQL Parser用于解析输入的SQL语句,将其转换为内部数据结构(如抽象语法树)以供后续处理。

Semantic Analyzer:Semantic Analyzer(语义分析器)是Hive中的组件,负责验证SQL语句的语义正确性。它检查表和列的存在性、数据类型的匹配性、权限等,并生成查询的元数据信息。

AST Tree:AST(Abstract Syntax Tree)Tree是语义分析器生成的抽象语法树,它以树状结构表示SQL语句的语法结构,包括语句类型、表名、列名、连接关系等。

QueryBlock:QueryBlock(查询块)是Hive中的概念,表示SQL语句中的一个查询单元。它可以包含子查询、连接操作、聚合操作等,用于组织和处理复杂的查询语句。

OperatorTree:OperatorTree指的是逻辑查询计划的表示形式,它以树状结构表示查询的逻辑操作符和数据流。每个操作符代表一个数据处理操作,例如过滤、投影、连接等。

Logical plan optimizer:逻辑计划优化器是Hive中的组件,用于对逻辑查询计划进行优化。它可以选择最优的操作顺序、应用谓词下推、列剪裁等优化策略,以提高查询性能。

ReduceSinkOperator:ReduceSinkOperator是物理查询计划中的一种操作符,用于将数据重新分区和排序,以便进行后续的Reduce操作。

Physical plan:物理计划是逻辑计划经过物理优化后生成的查询执行计划。它定义了如何在MapReduce或其他底层计算引擎上执行查询,包括数据读取、操作顺序、数据传输等细节。

总的来说,这些概念在Hive中扮演着不同的角色,负责解析SQL语句、验证语义、优化查询计划,并最终转换为物理执行计划,以实现高效的数据查询和处理。

39.Hive 底层与数据库交互原理?

由于 Hive 的元数据可能要面临不断地更新、修改和读取操作,所以它显然不适合使用 Hadoop 文件系统进行存储。目前 Hive 将元数据存储在 RDBMS 中,比如存储在 MySQL、Derby 中。元数据信息包括:存在的表、表的列、权限和更多的其他信息。

40.Hive 的两张表关联,使用 MapReduce怎么实现?

如果其中有一张表为小表,直接使用 map 端 join 的方式(map 端加载小表)进行聚合。如果两张都是大表,那么采用联合 key,联合 key 的第一个组成部分是 join on 中的公共字段,第二部分是一个 flag,0 代表表 A,1 代表表 B,由此让Reduce 区分客户信息和订单信息;在 Mapper 中同时处理两张表的信息,将join on 公共字段相同的数据划分到同一个分区中,进而传递到一个 Reduce中,然后在 Reduce 中实现聚合。

41.请说明 hive 中 Sort By,Order By,Cluster By,Distrbute By各代表什么意思?

order by:会对输入做全局排序,因此只有一个 reducer(多个 reducer 无法保证全局有序)。只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间。

sort by:不是全局排序,其在数据进入 reducer 前完成排序。

distribute by:按照指定的字段对数据进行划分输出到不同的 reduce 中。

cluster by:除了具有 distribute by 的功能外还兼具 sort by 的功能。

42.hive中 DISTRIBUTE BY 和 group by 在聚合操作时的区别?

  1. DISTRIBUTE BY:DISTRIBUTE BY是用于在查询过程中指定分发列,将数据发送到不同的Reducer任务中进行并行处理。DISTRIBUTE BY只负责数据的分发,不会进行聚合操作。它可以确保具有相同分发列值的数据被发送到同一个Reducer任务中,从而提高并行处理性能。

  2. GROUP BY:GROUP BY是用于在查询中进行分组和聚合操作的关键字。它按照指定的列对数据进行分组,并对每个组进行聚合操作(如SUM、COUNT、AVG等)。GROUP BY会将数据发送到不同的Reducer任务中,每个任务负责处理一个或多个组的数据。GROUP BY会对每个组进行聚合操作,最后返回每个组的聚合结果。

区别总结:

DISTRIBUTE BY负责数据的分发,不进行聚合操作;
GROUP BY负责数据的分组和聚合操作;
DISTRIBUTE BY将数据发送到不同的Reducer任务中进行并行处理;
GROUP BY将数据发送到不同的Reducer任务中,每个任务负责处理一个或多个组的数据,并对每个组进行聚合操作。
示例: 假设有一个销售数据表,包含销售日期、产品类别、销售额等字段。如果需要按照产品类别进行聚合操作,可以使用GROUP BY进行分组和聚合。示例语句如下:

sql
SELECT product_category, SUM(sales_amount) 
FROM sales_table 
GROUP BY product_category;

这样,数据将按照产品类别进行分组,并对每个组进行销售额的求和聚合。GROUP BY会将数据发送到不同的Reducer任务中,每个任务负责处理一个或多个组的数据。

43.hive中什么情况下要用Distribute By,列举现实案例?

在Hive中,使用Distribute By可以在以下情况下提高查询性能和并行处理能力:

聚合操作:当进行聚合操作(如SUM、COUNT、AVG等)时,使用Distribute By可以将具有相同分发列值的数据发送到同一个Reducer任务中,从而在并行处理中提高性能。

连接操作:在进行连接操作(如JOIN)时,使用Distribute By可以确保连接所需的数据在同一个Reducer任务中进行处理,从而提高连接操作的性能和效率。

数据倾斜:当数据倾斜问题出现时,使用Distribute By可以将倾斜的数据均匀地分发到不同的Reducer任务中,从而减少某个Reducer任务的负载,提高整体查询性能。

有序输出:如果需要按照特定列对查询结果进行有序输出,可以使用Distribute By将数据分发到不同的Reducer任务中,然后在每个Reducer任务中使用Order By进行局部排序,最后将结果合并得到有序的输出。

举例: 假设有一个销售数据表,包含销售日期、产品类别、销售额等字段。如果需要按照产品类别进行聚合操作,可以使用Distribute By将相同产品类别的数据发送到同一个Reducer任务中进行聚合,从而提高性能。示例语句如下:

SELECT product_category, SUM(sales_amount) 
FROM sales_table 
DISTRIBUTE BY product_category;

这样,相同产品类别的数据将被发送到同一个Reducer任务中进行聚合操作。

44. 写出 hive 中 split、coalesce 及 collect_list 函数的用法(可举例)?

split 将字符串转化为数组,即:split(‘a,b,c,d’ , ‘,’) ==> [“a”,“b”,“c”,“d”]。

coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回 NULL。

collect_list 列出该字段所有的值,不去重 => select collect_list(id) fromtable。

45. Hive 有哪些方式保存元数据,各有哪些特点?

Hive 支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器,每种存储方式使用不同的配置参数。内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby 是内嵌式元存储的默认数据库。在本地模式下,每个 Hive 客户端都会打开到数据存储的连接并在该连接上请求 SQL 查询。在远程模式下,所有的 Hive 客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用 Thrift 协议通信。

46.Hive 中的压缩格式 TextFile、SequenceFile、RCfile、ORCfile各有什么区别?

1、TextFile

默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销****大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,压缩后的文件不支持 split,Hive 不会对数据进行切分,从而无法对数据进行并行操作。并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比 SequenceFile 高几十倍。

2、SequenceFile

SequenceFile 是 Hadoop API 提供的一种二进制文件支持,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。

SequenceFile 支持三种压缩选择:NONE,RECORD,BLOCK。Record 压缩率低,一般建议使用 BLOCK 压缩。优势是文件和 hadoop api 中的 MapFile 是相互兼容的

3、RCFile

存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低;其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;

4、ORCFile

存储方式:数据按行分块 每块按照列存储。

压缩快、快速列存取。

效率比 rcfile 高,是 rcfile 的改良版本。

47.所有的 Hive 任务都会有 MapReduce 的执行吗?

不是,从 Hive0.10.0 版本开始,对于简单的不需要聚合的类似 SELECT from LIMIT n 语句,不需要起 MapReduce job,直接通过 Fetch task获取数据。

48.hive中 Fetch task 是什么意思

在Hive中,Fetch Task是一种任务类型,用于将查询结果从执行引擎(如MapReduce或Tez)发送给客户端。Fetch Task负责将查询结果从数据存储层(如HDFS)读取并传输给客户端,以便客户端可以获取和展示查询的结果。

当执行一个查询时,Hive将查询分解为一系列的任务,其中包括Map Task、Reduce Task和Fetch Task。Map Task和Reduce Task负责处理数据的计算和聚合,而Fetch Task则负责将最终的查询结果返回给客户端。

当所有的Map Task和Reduce Task完成后,Fetch Task被触发,它会从最终的输出文件中读取数据,并将数据发送给客户端。这通常是通过网络传输的方式完成的。

Fetch Task的目标是将查询结果高效地传输给客户端,以便用户可以及时地查看和分析数据。它在查询执行的最后阶段发挥重要作用,确保查询结果的及时可用性。

49.Hive 的函数:UDF、UDAF、UDTF 的区别?

UDF:单行进入,单行输出

//UDF的实例:编写一个UDF来将字符串转换为大写或小写。
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class UpperLowerUDF extends UDF {
    public Text evaluate(Text input) {
        if (input == null) {
            return null;
        }
        return new Text(input.toString().toUpperCase());
    }
}

UDAF:多行进入,单行输出

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
//UDAF的实例:编写一个UDAF来计算某个字段的平均值或总和
public class AverageUDAF extends UDAF {
    public static class AverageUDAFEvaluator implements UDAFEvaluator {
        private static class PartialResult {
            long sum;
            long count;
        }

        private PartialResult partial;

        public void init() {
            partial = null;
        }

        public boolean iterate(long value) {
            if (partial == null) {
                partial = new PartialResult();
            }
            partial.sum += value;
            partial.count++;
            return true;
        }

        public PartialResult terminatePartial() {
            return partial;
        }

        public boolean merge(PartialResult other) {
            if (other == null) {
                return true;
            }
            if (partial == null) {
                partial = new PartialResult();
            }
            partial.sum += other.sum;
            partial.count += other.count;
            return true;
        }

        public double terminate() {
            if (partial == null) {
                return 0;
            }
            return (double) partial.sum / partial.count;
        }
    }
}

UDTF:单行输入,多行输出

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import java.util.ArrayList;
//编写一个UDTF来将一列数据拆分为多个行:
public class SplitUDTF extends GenericUDTF {
    private transient ObjectInspector[] inputOI;

    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("SplitUDTF only takes 1 argument: string");
        }

        // 输入参数的类型检查
        if (!(args[0] instanceof StringObjectInspector)) {
            throw new UDFArgumentException("SplitUDTF takes a string as parameter");
        }

        // 输入参数的类型和字段名称
        inputOI = args;

        // 输出的字段名称
        ArrayList<String> fieldNames = new ArrayList<>();
        fieldNames.add("value");

        // 输出的字段类型
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        String input = ((StringObjectInspector) inputOI[0]).getPrimitiveJavaObject(args[0]);

        if (input != null) {
            String[] values = input.split(",");

            for (String value : values) {
                forward(new Object[]{value});
            }
        }
    }

    @Override
    public void close() throws HiveException {
        // 清理资源
    }
}

50.说说对 Hive 桶表的理解?

桶表是对数据进行哈希取值,然后放到不同文件中存储。数据加载到桶表时,会对字段取 hash 值,然后与桶的数量取模。把数据放到对应的文件中。物理上,每个桶就是表(或分区)目录里的一个文件,一个作业产生的桶(输出文件)和 reduce 任务个数相同。桶表专门用于抽样查询,是很专业性的,不是日常用来存储数据的表,需要抽样查询时,才创建和使用桶表。

51.Fetch抓取

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

52. 海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10

方案1:

a) 在每台电脑上求出TOP10,可以采用包含10个元素的堆完成(TOP10小,用最大堆,TOP10大,用最小堆)。

b) 比如求TOP10大,我们首先取前10个元素调整成最小堆,如果发现,然后扫描后面的数据,并与堆顶元素比较,如果比堆顶元素大,那么用该元素替换堆顶,然后再调整为最小堆。

c) 最后堆中的元素就是TOP10大。

方案2

a) 求出每台电脑上的TOP10后,然后把这100台电脑上的TOP10组合起来,共1000个数据

b) 再利用上面类似的方法求出TOP10就可以了。

53. Hive中追加导入数据的4种方式是什么?请写出简要语法

\1) 从本地导入:

load data local  inpath ‘/home/1.txt’ (overwrite) into table student;

\2) 从Hdfs导入:

load data  inpath ‘/**user**/hive/warehouse/1.txt’ (overwrite) into table student;

\3) 查询导入:

create table student1 as select * from student;(也可以具体查询某项数据)

4)查询结果导入:

insert (overwrite)into table  staff  select   * from track_log;

54. Hive导出数据有几种方式?如何导出数据

1) 用insert overwrite导出方式

a) 导出到本地:

insert overwrite local directory ‘/home/robot/1/2’ rom format delimited fields terminated by ‘*t’ **select \ from staff;(递归创建目录)

b) 导出到HDFS

insert overwrite directory ‘/user/hive/1/2’ rom format delimited fields terminated by ‘*t’ **select \ from staff;

2) Bash shell覆盖追加导出

例如:$ bin/hive -e “select * from staff;” > /home/z/backup.log

3) Sqoop把hive数据导出到外部

55. Hive优化

1) 通用设置

hive.optimize.cp=true:列裁剪
hive.optimize.prunner:分区裁剪
hive.limit.optimize.enable=true:优化LIMIT n语句
hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大文件数

2) 本地模式(小任务)

a)job的输入数据大小必须小于参数:
hive.exec.mode.local.auto.inputbytes.max(默认128MB)

b)job的map数必须小于参数:
hive.exec.mode.local.auto.tasks.max(默认4)

c)job的reduce数必须为0或者1
hive.exec.mode.local.auto.inputbytes.max=134217728 hive.exec.mode.local.auto.tasks.max=4 hive.exec.mode.local.auto=true hive.mapred.local.mem:本地模式启动的JVM内存大小

3) 并发执行

hive.exec.parallel=true ,默认为false
hive.exec.parallel.thread.number=8

4) Strict Mode:

hive.mapred.mode=true,严格模式不允许执行以下查询:

  • 分区表上没有指定了分区;
  • 没有limit限制的order by语句;
  • 笛卡尔积:JOIN时没有ON语句;

5) 动态分区

// hive.exec.dynamic.partition.mode=strict
hive.exec.max.dynamic.partitions=1000
//在每一个mapper/reducer节点允许创建的最大分区数
hive.exec.max.dynamic.partitions.pernode=100
//允许DATANODE打开多少个文件DATANODE:
dfs.datanode.max.xceivers=8192

6) 推测执行

mapred.map.tasks.speculative.execution=true mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;

7) 多个group by合并

hive.multigroupby.singlemar=true
当多个GROUP BY语句有相同的分组列,则会优化为一个MR任务

8) 虚拟列

hive.exec.rowoffset
是否提供虚拟列

9) 分组

  • 两个聚集函数不能有不同的DISTINCT列,以下表达式是错误的:
    INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;

  • SELECT语句中只能有GROUP BY的列或者聚集函数。

10) Combiner聚合
// 在map中会做部分聚集操作,效率更高但需要更多的内存。
hive.map.aggr=true;
// 在Map端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval;

11) 数据倾斜

  • hive.groupby.skewindata=true;数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。
  • 第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key

有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

  • 第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

12) 排序

ORDER BY colName ASC/DESC
hive.mapred.mode=strict 时需要跟limit子句
hive.mapred.mode=nonstrict 时使用单个reduce完成排序
SORT BY colName ASC/DESC :每个reduce内排序
DISTRIBUTE BY(子查询情况下使用 ):控制特定行应该到哪个reducer,并不保证reduce内数据的顺序
CLUSTER BY :当SORT BY 、DISTRIBUTE BY使用相同的列时。

13) 合并小文件

// 合并map输出
hive.merg.mapfiles=true;
// 合并reduce输出
hive.merge.mapredfiles=false;
// 合并文件的大小
hive.merge.size.per.task=256*1000*1000;
// 如果支持CombineHiveInputFormat则生成只有Map的任务执行merge hive.mergejob.maponly=true;
// 文件的平均大小小于该值时,会启动一个MR任务执行merge。hive.merge.smallfiles.avgsize=16000000;

14) 自定义map/reduce数目

  • 减少map数目:   
    set mapred.max.split.size set mapred.min.split.size set mapred.min.split.size.per.node set mapred.min.split.size.per.rack set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

  • 增加map数目:

a) 当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。

b) 假设有这样一个任务:

select data_desc, count(1), count(distinct id),sum(case when …),sum(case when …),sum(…) from a group by data_desc

c) 如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。  

set mapred.reduce.tasks=10; create table a_1 as select * from a distribute by rand(123);

d) 这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。

  • reduce数目设置:

a) 参数1:hive.exec.reducers.bytes.per.reducer=1G:每个reduce任务处理的数据量

b) 参数2:hive.exec.reducers.max=999(0.95*TaskTracker数):每个任务最大的reduce数目

c) reducer数=min(参数2,总输入数据量/参数1)

d) set mapred.reduce.tasks:每个任务默认的reduce数目。典型为0.99*reduce槽数,hive将其设置为-1,自动确定reduce数目。

15) 使用索引:

hive.optimize.index.filter:自动使用索引

hive.optimize.index.groupby:使用聚合索引优化GROUP BY操作

56.Hive 数据模型

Hive中所有的数据都存储在HDFS中,没有专门的数据存储格式

在创建表时指定数据中的分隔符,Hive 就可以映射成功,解析数据。

Hive中包含以下数据模型:

**db:**在hdfs中表现为hive.metastore.warehouse.dir目录下一个文件夹

**table:**在hdfs中表现所属db目录下一个文件夹

**external table:**数据存放位置可以在HDFS任意指定路径

**partition:**在hdfs中表现为table目录下的子目录

**bucket:**在hdfs中表现为同一个表目录下根据hash散列之后的多个文件

57.常用操作

57.1.数据库相关

Hive配置单元包含一个名为 default 默认的数据库.

  • —创建数据库

create database [if not exists] ;

  • –显示所有数据库

show databases;

  • –删除数据库

drop database if exists [restrict|cascade];

默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错

–加入cascade关键字,可以强制删除一个数据库

hive> drop database if exists users cascade;

  • –切换数据库

use ;

57.2.内部表外部表

建内部表

create table

student(Sno int,Sname string,Sex string,Sage int,Sdept string)

row format delimited fields terminated by ‘,’;

建外部表

create external table

student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string)

row format delimited fields terminated by ‘,’ location ‘/stu’;

内、外部表加载数据:

load data local inpath ‘/root/hivedata/students.txt’ overwrite into table student;

load data inpath ‘/stu’ into table student_ext;

57.3.创建分区表

  • 分区建表分为2种,一种是单分区,也就是说在表文件夹目录下只有一级文件夹目录。另外一种是多分区,表文件夹下出现多文件夹嵌套模式。

  • 单分区建表语句

create table day_table (id int, content string) partitioned by (dt string);

单分区表,按天分区,在表结构中存在id,content,dt三列。

  • 双分区建表语句

create table day_hour_table (id int, content string) partitioned by (dt string, hour string);

双分区表,按天和小时分区,在表结构中新增加了dt和hour两列。

导入数据
load data local inpath ‘/root/hivedata/dat_table.txt’ into table day_table partition(dt=’2017-07-07’);

load data local inpath ‘/root/hivedata/dat_table.txt’ into table day_hour_table partition(dt=’2017-07-07’, hour=’08’);

基于分区的查询:

SELECT day_table.* FROM day_table WHERE day_table.dt = ‘2017-07-07’;

查看分区

show partitions day_hour_table;

总的说来partition就是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。

  • 指定分隔符

—指定分隔符创建分区表

create table day_table (id int, content string) partitioned by (dt string) row format delimited fields terminated by ‘,’;

—复杂类型的数据表指定分隔符

数据如下

zhangsan beijing,shanghai,tianjin,hangzhou

wangwu shanghai,chengdu,wuhan,haerbin

建表语句

create table

complex\_array(name string,work\_locations array<string>) 

row format delimited fields terminated by '\t' 

collection items terminated by ',';

57.4.增删分区

  • 增加分区

alter table t_partition add partition (dt=’2008-08-08’) location ‘hdfs://node-21:9000/t_parti/‘;

执行添加分区 /t_parti文件夹下的数据不会被移动。并且没有分区目录dt=2008-08-08

  • 删除分区

alter table t_partition drop partition (dt=’2008-08-08’);

执行删除分区时/t_parti下的数据会被删除并且连同/t_parti文件夹也会被删除

注意区别于load data时候添加分区:会移动数据 会创建分区目录

57.5.hive中的join

准备数据

1,a

2,b

3,c

4,d

7,y

8,u

2,bb

3,cc

7,yy

9,pp

建表:

create table a(id int,name string)

row format delimited fields terminated by ‘,’;

create table b(id int,name string)

row format delimited fields terminated by ‘,’;

导入数据:

load data local inpath ‘/root/hivedata/a.txt’ into table a;

load data local inpath ‘/root/hivedata/b.txt’ into table b;

实验:

** inner join

select * from a inner join b on a.id=b.id;

+——-+———+——-+———+–+

| a.id | a.name | b.id | b.name |

+——-+———+——-+———+–+

| 2 | b | 2 | bb |

| 3 | c | 3 | cc |

| 7 | y | 7 | yy |

+——-+———+——-+———+–+

**left join

select * from a left join b on a.id=b.id;

+——-+———+——-+———+–+

| a.id | a.name | b.id | b.name |

+——-+———+——-+———+–+

| 1 | a | NULL | NULL |

| 2 | b | 2 | bb |

| 3 | c | 3 | cc |

| 4 | d | NULL | NULL |

| 7 | y | 7 | yy |

| 8 | u | NULL | NULL |

+——-+———+——-+———+–+

**right join

select * from a right join b on a.id=b.id;

select * from b right join a on b.id=a.id;

+——-+———+——-+———+–+

| a.id | a.name | b.id | b.name |

+——-+———+——-+———+–+

| 2 | b | 2 | bb |

| 3 | c | 3 | cc |

| 7 | y | 7 | yy |

| NULL | NULL | 9 | pp |

+——-+———+——-+———+–+

**

select * from a full outer join b on a.id=b.id;

+——-+———+——-+———+–+

| a.id | a.name | b.id | b.name |

+——-+———+——-+———+–+

| 1 | a | NULL | NULL |

| 2 | b | 2 | bb |

| 3 | c | 3 | cc |

| 4 | d | NULL | NULL |

| 7 | y | 7 | yy |

| 8 | u | NULL | NULL |

| NULL | NULL | 9 | pp |

+——-+———+——-+———+–+

**hive中的特别join

select * from a left semi join b on a.id = b.id;

select a.* from a inner join b on a.id=b.id;

+——-+———

| a.id | a.name

+——-+———

| 2 | b

| 3 | c

| 7 | y

+——-+———

相当于

select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低

select a.id,a.name from a join b on (a.id = b.id);

select * from a inner join b on a.id=b.id;

cross join(##慎用)

返回两个表的笛卡尔积结果,不需要指定关联键。

select a.*,b.* from a cross join b;

57.6.json解析

1、先加载rating.json文件到hive的一个原始表 rat_json

样例:{“movie”:”1193”,”rate”:”5”,”timeStamp”:”978300760”,”uid”:”1”}

create table rat_json(line string) row format delimited;

load data local inpath ‘/root/hivedata/rating.json’ into table rat_json;

2、需要解析json数据成四个字段,插入一张新的表 t_rating

drop table if exists t_rating;

create table t_rating(movieid string,rate int,timestring string,uid string)

row format delimited fields terminated by ‘\t’;

3、json表数据解析到rating表中

insert overwrite table t_rating

select

get_json_object(line,’$.movie’) as moive,

get_json_object(line,’$.rate’) as rate,

get_json_object(line,’$.timeStamp’) as timestring, get_json_object(line,’$.uid’) as uid

from rat_json limit 10;

58.常用函数

58.1.数值函数

  • 指定精度取整函数 : round

语法: round(double a, int d)

返回值: DOUBLE

说明: 返回指定精度d的double类型

举例:

hive> select round(3.1415926,4) from dual;

3.1416

  • 向下取整函数 : floor

语法: floor(double a)

返回值: BIGINT

说明: 返回等于或者小于该double变量的最大的整数

举例:

hive> select floor(3.1415926) from dual;

3

hive> select floor(25) from dual;

25

  • 向上取整函数 : ceil

语法: ceil(double a)

返回值: BIGINT

说明: 返回等于或者大于该double变量的最小的整数

举例:

hive> select ceil(3.1415926) from dual;

4

hive> select ceil(46) from dual;

46

  • 取随机数函数 : rand

语法: rand(),rand(int seed)

返回值: double

说明: 返回一个0到1范围内的随机数。如果指定种子seed,则会等到一个稳定的随机数序列

举例:

hive> select rand() from dual;

0.5577432776034763

  • 绝对值函数 : abs

语法: abs(double a) abs(int a)

返回值: double int

说明: 返回数值a的绝对值

举例:

hive> select abs(-3.9) from dual;

3.9

hive> select abs(10.9) from dual;

10.9

58.2日期函数

  • to_date(string timestamp):返回时间字符串中的日期部分,

。如to_date(‘1970-01-01 00:00:00’)=‘1970-01-01’

  • current_date:返回当前日期
  • year(date):返回日期date的年,类型为int
  • 。如year(‘2019-01-01’)=2019
  • month(date):返回日期date的月,类型为int,

。如month(‘2019-01-01’)=1

  • day(date): 返回日期date的天,类型为int,

。如day(‘2019-01-01’)=1

  • weekofyear(date1):返回日期date1位于该年第几周。

。如weekofyear(‘2019-03-06’)=10

  • datediff(date1,date2):返回日期date1与date2相差的天数

。如datediff(‘2019-03-06’,‘2019-03-05’)=1

  • date_add(date1,int1):返回日期date1加上int1的日期

。如date_add(‘2019-03-06’,1)=‘2019-03-07’

  • date_sub(date1,int1):返回日期date1减去int1的日期

。如date_sub(‘2019-03-06’,1)=‘2019-03-05’

  • months_between(date1,date2):返回date1与date2相差月份

。如months_between(‘2019-03-06’,‘2019-01-01’)=2

  • add_months(date1,int1):返回date1加上int1个月的日期,int1可为负数

。如add_months(‘2019-02-11’,-1)=‘2019-01-11’

  • last_day(date1):返回date1所在月份最后一天

。如last_day(‘2019-02-01’)=‘2019-02-28’

  • next_day(date1,day1):返回日期date1的下个星期day1的日期。day1为星期X的英文前两字母

。如next_day(‘2019-03-06’,‘MO’) 返回’2019-03-11’

  • **trunc(date1,string1)😗*返回日期最开始年份或月份。string1可为年(YYYY/YY/YEAR)或月(MONTH/MON/MM)。

。如trunc(‘2019-03-06’,‘MM’)=‘2019-03-01’,trunc(‘2019-03-06’,‘YYYY’)=‘2019-01-01’

  • unix_timestamp():返回当前时间的unix时间戳,可指定日期格式。

。如unix_timestamp(‘2019-03-06’,‘yyyy-mm-dd’)=1546704180

  • from_unixtime():返回unix时间戳的日期,可指定格式。

。如select from_unixtime(unix_timestamp(‘2019-03-06’,‘yyyy-mm-dd’),‘yyyymmdd’)=‘20190306’

58.3.条件函数

  • if(boolean,t1,t2):若布尔值成立,则返回t1,反正返回t2。

。如if(1>2,100,200)返回200

  • case when boolean then t1 else t2 end:若布尔值成立,则t1,否则t2,可加多重判断
  • coalesce(v0,v1,v2):返回参数中的第一个非空值,若所有值均为null,则返回null。

。如coalesce(null,1,2)返回1

  • isnull(a):若a为null则返回true,否则返回false

日期函数总结

select
     day                                                                                                   -- 时间
    ,date_add(day,1 - dayofweek(day))                                                  as week_first_day   -- 本周第一天_周日
    ,date_add(day,7 - dayofweek(day))                                                  as week_last_day    -- 本周最后一天_周六
    ,date_add(day,1 - case when dayofweek(day) = 1 then 7 else dayofweek(day) - 1 end) as week_first_day   -- 本周第一天_周一
    ,date_add(day,7 - case when dayofweek(day) = 1 then 7 else dayofweek(day) - 1 end) as week_last_day    -- 本周最后一天_周日
    ,next_day(day,'TU')                                                                as next_tuesday     -- 当前日期的下个周二
    ,trunc(day,'MM')                                                                   as month_first_day  -- 当月第一天
    ,last_day(day)                                                                     as month_last_day   -- 当月最后一天
    ,to_date(concat(year(day),'-',lpad(ceil(month(day)/3) * 3 -2,2,0),'-01'))          as season_first_day -- 当季第一天
    ,last_day(to_date(concat(year(day),'-',lpad(ceil(month(day)/3) * 3,2,0),'-01')))   as season_last_day  -- 当季最后一天
    ,trunc(day,'YY')                                                                   as year_first_day   -- 当年第一天
    ,last_day(add_months(trunc(day,'YY'),12))                                          as year_last_day    -- 当年最后一天
    ,weekofyear(day)                                                                   as weekofyear       -- 当年第几周
    ,second(day)                                                                       as second           -- 秒钟
    ,minute(day)                                                                       as minute           -- 分钟
    ,hour(day)                                                                         as hour             -- 小时
    ,day(day)                                                                          as day              -- 日期
    ,month(day)                                                                        as month            -- 月份
    ,lpad(ceil(month(day)/3),2,0)                                                      as season           -- 季度
    ,year(day)                                                                         as year             -- 年份
from (
    select '2018-01-02 01:01:01' as day union all
    select '2018-02-02 02:03:04' as day union all
    select '2018-03-02 03:05:07' as day union all
    select '2018-04-02 04:07:10' as day union all
    select '2018-05-02 05:09:13' as day union all
    select '2018-06-02 06:11:16' as day union all
    select '2018-07-02 07:13:19' as day union all
    select '2018-08-02 08:15:22' as day union all
    select '2018-09-02 09:17:25' as day union all
    select '2018-10-02 10:19:28' as day union all
    select '2018-11-02 11:21:31' as day union all
    select '2018-12-02 12:23:34' as day
) t1
;

获取当前时间截:

1 select unix_timestamp() ;
2 +-------------+--+
3 |     _c0     |
4 +-------------+--+
5 | 1521684090  |
6 +-------------+--+

获取当前时间1:

1 select current_timestamp;
2 +--------------------------+--+
3 |           _c0            |
4 +--------------------------+--+
5 | 2018-03-22 10:04:02.568  |
6 +--------------------------+--+

获取当前时间2:

1 SELECT from_unixtime(unix_timestamp());
2 +----------------------+--+
3 |         _c0          |
4 +----------------------+--+
5 | 2018-03-22 10:04:38  |
6 +----------------------+--+

获取当前日期:

1 SELECT CURRENT_DATE;
2 +-------------+--+
3 |     _c0     |
4 +-------------+--+
5 | 2018-03-22  |
6 +-------------+--+

日期差值:datediff(结束日期,开始日期),返回结束日期减去开始日期的天数。

1 select datediff(CURRENT_DATE,'2017-01-01') as datediff;
2 +-----------+--+
3 | datediff  |
4 +-----------+--+
5 | 445       |
6 +-----------+--+

日期加减:date_add(时间,增加天数),返回值为时间天+增加天的日期;date_sub(时间,减少天数),返回日期减少天后的日期。

1 select date_add(current_date,365) as dateadd;
2 +-------------+--+
3 |   dateadd   |
4 +-------------+--+
5 | 2019-03-22  |
6 +-------------+--+

时间差:两个日期之间的小时差

1 select (hour('2018-02-27 10:00:00')-hour('2018-02-25 12:00:00')+(datediff('2018-02-27 10:00:00','2018-02-25 12:00:00'))*24) as hour_subValue;
2 +----------------+--+
3 | hour_subValue  |
4 +----------------+--+
5 | 46             |
6 +----------------+--+

获取年、月、日、小时、分钟、秒、当年第几周

 1 select 
 2      year('2018-02-27 10:00:00')       as year
 3     ,month('2018-02-27 10:00:00')      as month
 4     ,day('2018-02-27 10:00:00')        as day
 5     ,hour('2018-02-27 10:00:00')       as hour
 6     ,minute('2018-02-27 10:00:00')     as minute
 7     ,second('2018-02-27 10:00:00')     as second
 8     ,weekofyear('2018-02-27 10:00:00') as weekofyear
 9 ; 
10 +-------+--------+------+-------+---------+---------+-------------+--+
11 | year  | month  | day  | hour  | minute  | second  | weekofyear  |
12 +-------+--------+------+-------+---------+---------+-------------+--+
13 | 2018  | 2      | 27   | 10    | 0       | 0       | 9           |
14 +-------+--------+------+-------+---------+---------+-------------+--+

转成日期:

1 select to_date('2018-02-27 10:03:01') ;
2 +-------------+--+
3 |     _c0     |
4 +-------------+--+
5 | 2018-02-27  |
6 +-------------+--+

当月最后一天:

1 select  last_day('2018-02-27 10:03:01');
2 +-------------+--+
3 |     _c0     |
4 +-------------+--+
5 | 2018-02-28  |
6 +-------------+--+

当月第一天:

1 select trunc(current_date,'MM') as day;
2 +-------------+--+
3 |     day     |
4 +-------------+--+
5 | 2018-03-01  |
6 +-------------+--+

当年第一天:

1 select trunc(current_date,'YY') as day;
2 +-------------+--+
3 |     day     |
4 +-------------+--+
5 | 2018-01-01  |
6 +-------------+--+

next_day,返回当前时间的下一个星期几所对应的日期

1 select next_day('2018-02-27 10:03:01', 'TU');
2 +-------------+--+
3 |     _c0     |
4 +-------------+--+
5 | 2018-03-06  |
6 +-------------+--+

hive中怎么获取两个日期相减后的小时(精确到两位小数点),而且这两个日期有可能会出现一个日期有时分秒,一个日期没有时分秒的情况

select
     t3.day1
    ,t3.day2
    ,t3.day  -- 日期
    ,t3.hour -- 小时
    ,t3.min  -- 分钟
    ,t3.day + t3.hour          as hour_diff_1
    ,t3.day + t3.hour + t3.min as hour_diff_2
    ,round((cast(cast(t3.day1 as timestamp) as bigint) - cast(cast(t3.day2 as timestamp) as bigint)) / 3600,2) as hour_diff_3 -- 最优
    ,(datediff(t3.day1,t3.day2) * 24) + (nvl(hour(t3.day1),0) - nvl(hour(t3.day2),0)) + round((nvl(minute(t3.day1),0) - nvl(minute(t3.day2),0)) / 60,2) as hour_diff_4
from (
    select
         t2.day1
        ,t2.day2
        ,(datediff(t2.day1,t2.day2) * 24)                  as day  -- 日期
        ,(hour(t2.day1) - hour(t2.day2))                   as hour -- 小时
        ,round((minute(t2.day1) - minute(t2.day2)) / 60,2) as min  -- 分钟
    from (
        select
             cast(t1.day1 as timestamp) as day1
            ,cast(t1.day2 as timestamp) as day2
        from (
            select '2018-01-03 02:30:00' as day1, '2018-01-02 23:00:00' as day2 union all
            select '2018-06-02 08:15:22' as day1, '2018-06-02 06:11:16' as day2 union all
            select '2018-07-04'          as day1, '2018-07-02 01:01:01' as day2
        ) t1
    ) t2
) t3
;
复制代码
复制代码
+------------------------+------------------------+------+-------+--------+--------------+--------------+--------------+--------------+--+
|          day1          |          day2          | day  | hour  |  min   | hour_diff_1  | hour_diff_2  | hour_diff_3  | hour_diff_4  |
+------------------------+------------------------+------+-------+--------+--------------+--------------+--------------+--------------+--+
| 2018-07-04 00:00:00.0  | 2018-07-02 01:01:01.0  | 48   | -1    | -0.02  | 47           | 46.98        | 46.98        | 46.98        |
| 2018-01-03 02:30:00.0  | 2018-01-02 23:00:00.0  | 24   | -21   | 0.5    | 3            | 3.5          | 3.5          | 3.5          |
| 2018-06-02 08:15:22.0  | 2018-06-02 06:11:16.0  | 0    | 2     | 0.07   | 2            | 2.07         | 2.07         | 2.07         |
+------------------------+------------------------+------+-------+--------+--------------+--------------+--------------+--------------+--+
复制代码

当周第一天,最后一天

date -d "2018-10-24 $(($(date -d 2018-10-24 +%u)-1)) days ago" +%Y-%m-%d
date -d "2018-10-24 $((7-$(date -d 2018-10-24 +%u))) days" +%Y-%m-%d

58.4.字符串函数

  • length(string1):返回字符串长度
  • concat(string1,string2):返回拼接string1及string2后的字符串
  • concat_ws(sep,string1,string2):返回按指定分隔符拼接的字符串
  • lower(string1):返回小写字符串,同lcase(string1)。upper()/ucase():返回大写字符串
  • trim(string1):去字符串左右空格,ltrim(string1):去字符串左空格。rtrim(string1):去字符串右空
  • repeat(string1,int1):返回重复string1字符串int1次后的字符串
  • reverse(string1):返回string1反转后的字符串。

。如reverse(‘abc’)返回’cba’

  • rpad(string1,len1,pad1):以pad1字符右填充string1字符串,至len1长度。

。如rpad(‘abc’,5,‘1’)返回’abc11’。lpad():左填充

  • split(string1,pat1):以pat1正则分隔字符串string1,返回数组。

。如split(‘a,b,c’,‘,’)返回[“a”,“b”,“c”]

  • substr(string1,index1,int1):以index位置起截取int1个字符。

。如substr(‘abcde’,1,2)返回’ab’
。hive> select substr(‘abcde’,3) fromlxw_dual;
cde
。hive> select substring(‘abcde’,3) fromlxw_dual;
cde
。hive> select substr(‘abcde’,-1) from lxw_dual; (和ORACLE相同,负数从最后一位开始截取)
e
。hive> select substr(‘abcde’,1,2) 和selectsubstr(‘abcde’,0,2)结果一样ab,默认都是从第一位开始取.

  • 正则匹配字符串regexp_extract
    语法: regexp_extract(string A, string pattern, int index)
    返回值: string
    说明:将字符串A按照正则表达式的规则拆分,返回index指定的部分(从1开始,0表示输出所有已匹配值)
    简而言之,index为0时会返回所有匹配,index为其他值n时,只返回第n段匹配
    regexp_extract('【e往无前】亚索您好,电刀正申请退货,验证码666666','[0-9]{6}',0)
    此时返回值为 正则匹配到的 666666
    regexp_extract('【e往无前】亚索您好,电刀正申请退货,验证码666666','(^【.*】).*[0-9]{6}',0)
    此时返回值为 括号括起来的段匹配,即【e往无前】
    regexp_extract('【e往无前】亚索您已掉线333天,封号666天','(^【.*】).*([0-9]{3})',2)
    此时返回值为 第2个括号括起来的匹配,即([0-9]{3}) 匹配到的 666
    额外说明: 返回值之所以不是前边的333 , 是因为在贪婪(默认)模式下,正则引擎会尽可能匹配更长的字符,若在.*后加上?则可关闭贪婪模式,开启懒惰模式。此时正则引擎会 尝试尽可能匹配更多次字符
    regexp_extract('【e往无前】亚索您已掉线333天,封号666天','(^【.*】).*?([0-9]{3})',2)
    此时返回值为 第2个括号括起来的匹配,即([0-9]{3}) 匹配到的 333
    regexp_extract('【e往无前】亚索您已掉线333天,封号666天','(^【.*】).*?([0-9]{3})',0)
    此时由于最后指定了0,即输出所有匹配,所以最后返回值为 【e往无前】亚索您已掉线333

58.5.类型转换

Hive的原子数据类型是可以进行隐式转换的,类似于Java的类型转换,例如某表达式使用INT类型,TINYINT会自动转换为INT类型,但是Hive不会进行反向转化,例如,某表达式使用TINYINT类型,INT不会自动转换为TINYINT类型,它会返回错误,除非使用CAST操作。

  • cast(value AS TYPE)

。select cast(‘1’ as DOUBLE); 返回1.0

59.hive常用的优化

59.1.Fetch抓取(Hive可以避免进行MapReduce)

Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

案例实操:

1)把hive.fetch.task.conversion设置成none,然后执行查询语句,都会执行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=none;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

2)把hive.fetch.task.conversion设置成more,然后执行查询语句,如下查询方式都不会执行mapreduce程序。

hive (default)> set hive.fetch.task.conversion=more;

hive (default)> select * from score;

hive (default)> select s_score from score;

hive (default)> select s_score from score limit 3;

59.2.本地模式

大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

set hive.exec.mode.local.auto=true; //开启本地mr

//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M

set hive.exec.mode.local.auto.inputbytes.max=51234560;

//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4

set hive.exec.mode.local.auto.input.files.max=10;

案例实操:

1)开启本地模式,并执行查询语句

hive (default)> set hive.exec.mode.local.auto=true;

hive (default)> select * from score cluster by s_id;

18 rows selected (1.568 seconds)

2)关闭本地模式,并执行查询语句

hive (default)> set hive.exec.mode.local.auto=false;

hive (default)> select * from score cluster by s_id;

18 rows selected (11.865 seconds)

59.3.分区表分桶表

  • 分区表对sql过滤查询是一种优化
  • 分桶表对join操作时提升性能很大,桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。

59.4.join优化

59.4.1.小表Join大表

  • (新的版本当中已经没有区别了,旧的版本当中需要使用小表)

1)将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

2)多个表关联时,最好分拆成小段,避免大sql(无法控制中间Job)

3)大表Join大表

(1)空KEY过滤

有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空。

对比如下:

不过滤

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;

结果:

No rows affected (152.135 seconds)

过滤

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;

结果:

No rows affected (141.585 seconds)

59.4.2.mapjoin

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

1)开启MapJoin参数设置:

(1)设置自动选择Mapjoin

set hive.auto.convert.join = true; 默认为true

(2)大表小表的阈值设置(默认25M以下认为是小表):

set hive.mapjoin.smalltable.filesize=25123456;

59.5.group by

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。

并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

1)开启Map端聚合参数设置

(1)是否在Map端进行聚合,默认为True

set hive.map.aggr = true;

(2)在Map端进行聚合操作的条目数目

set hive.groupby.mapaggr.checkinterval = 100000;

(3)有数据倾斜的时候进行负载均衡(默认是false)

set hive.groupby.skewindata = true;

当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

59.6.Map数

1.通常情况下,作业会通过input的目录产生一个或者多个map任务。

主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M,可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);

2.举例:

a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数。

b) 假设input目录下有3个文件a,b,c大小分别为10m,20m,150m,那么hadoop会分隔成4个块(10m,20m,128m,22m),从而产生4个map数。即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。

3.是不是map数越多越好?

答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。

4.是不是保证每个map处理接近128m的文件块,就高枕无忧了?

答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。

针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数;

5.如何增加map数

如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。

set mapreduce.job.reduces =10;

create table a_1 as

select * from a

distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。

59.7.reduce数

1.调整reduce个数方法一

(1)每个Reduce处理的数据量默认是256MB

hive.exec.reducers.bytes.per.reducer=256123456

(2)每个任务最大的reduce数,默认为1009

hive.exec.reducers.max=1009

2.调整reduce个数方法二

在hadoop的mapred-default.xml文件中修改

设置每个job的Reduce个数

set mapreduce.job.reduces = 15;

3.reduce个数并不是越多越好

1)过多的启动和初始化reduce也会消耗时间和资源;

2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;

59.8.jvm重用

JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。

mapreduce.job.jvm.numtasks

10

How many tasks to run per jvm. If set to -1, there is

no limit.

我们也可以在hive当中通过

set mapred.job.reuse.jvm.num.tasks=10;

这个设置来设置我们的jvm重用

缺点

开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

59.9.数据压缩与存储格式

1.压缩方式

压缩可以节约磁盘的空间,基于文本的压缩率可达40%+; 压缩可以增加吞吐量和性能量(减小载入内存的数据量),但是在压缩和解压过程中会增加CPU的开销。所以针对IO密集型的jobs(非计算密集型)可以使用压缩的方式提高性能。 几种压缩算法:

2.存储格式

1.TextFile

Hive数据表的默认格式,存储方式:行存储。 可以使用Gzip压缩算法,但压缩后的文件不支持split 在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比SequenceFile高几十倍。

2.Sequence Files

Hadoop中有些原生压缩文件的缺点之一就是不支持分割。支持分割的文件可以并行 的有多个mapper程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。Sequence File是可分割的文件格式,支持Hadoop的block级压缩。 Hadoop API提供的一种二进制文件,以key-value的形式序列化到文件中。存储方式:行存储。 sequencefile支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,RECORD是默认选项,通常BLOCK会带来较RECORD更好的压缩性能。 优势是文件和hadoop api中的MapFile是相互兼容的

3.RCFile

存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:

首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低 其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取 数据追加:RCFile不支持任意方式的数据写操作,仅提供一种追加接口,这是因为底层的 HDFS当前仅仅支持数据追加写文件尾部。 行组大小:行组变大有助于提高数据压缩的效率,但是可能会损害数据的读取性能,因为这样增加了 Lazy 解压性能的消耗。而且行组变大会占用更多的内存,这会影响并发执行的其他MR作业。

4.ORCFile

存储方式:数据按行分块,每块按照列存储。

压缩快,快速列存取。效率比rcfile高,是rcfile的改良版本。

5.Parquet

Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间

6.自定义格式

可以自定义文件格式,用户可通过实现InputFormat和OutputFormat来自定义输入输出格式。

结论,一般选择orcfile/parquet + snappy 的方式

create table tablename (
xxx string,
xxx bigint,
)
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")

59.10.并行执行

  • 当一个sql中有多个job时候,且这多个job之间没有依赖,则可以让顺序执行变为并行执行(一般为用到union all )

// 开启任务并行执行
set hive.exec.parallel=true;

// 同一个sql允许并行任务的最大线程数
set hive.exec.parallel.thread.number=8;

59.11.合并小文件

小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:

//设置map输入的小文件合并
set mapred.max.split.size=256000000;

//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;

//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;

//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

设置map输出和reduce输出进行合并的相关参数:

//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true

//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true

//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000

//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000

60.hive的数据倾斜

表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。

原因:某个reduce的数据输入量远远大于其他reduce数据的输入量

1)、key分布不均匀

2)、业务数据本身的特性

3)、建表时考虑不周

4)、某些SQL语句本身就有数据倾斜

61.介绍一下Hive的架构

Hive可以通过CLI,JDBC和 ODBC 等客户端进行访问。除此之外,Hive还支持 WUI 访问
Hive内部执行流程:解析器(解析SQL语句)、编译器(把SQL语句编译成MapReduce程序)、优化器(优化MapReduce程序)、执行器(将MapReduce程序运行的结果提交到HDFS)
Hive的「元数据」保存在数据库中,如保存在MySQL,SQLServer,PostgreSQL,Oracle及Derby等数据库中。Hive中的元数据信息包含表名,列名,分区及其属性,表的属性(包括是否为外部表),表数据所在目录等。
Hive将大部分 HiveSQL语句转化为MapReduce作业提交到Hadoop上执行;少数HiveSQL语句不会转化为MapReduce作业,直接从DataNode上获取数据后按照顺序输出。

62.Hive窗口函数的区别

RANK() 排序相同时会重复,总数不会变,例如1224
DENSE_RANK() 排序相同时会重复,总数会减少,例如 1223
ROW_NUMBER() 会根据顺序去计算,例如 1234

63.是否自定义过UDF,UDTF,简述步骤

在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?

你可以这么回答:
一般用UDF函数解析公共字段;用UDTF函数解析事件字段

具体的步骤对应如下:

「自定义UDF」:继承UDF,重写 evaluate 方法

「自定义UDTF」:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close

为什么要自定义UDF/UDTF?
因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试

64.了解过数据倾斜吗,是如何产生的,你又是怎么解决的?

概念:
数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是「数据倾斜」

如何产生
① key的分布不均匀或者说某些key太集中

② 业务数据自身的特性,例如不同数据类型关联产生数据倾斜

③ SQL语句导致的数据倾斜

如何解决
① 开启map端combiner(不影响最终业务逻辑)

② 开启数据倾斜时负载均衡

③ 控制空值分布
将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分配到多个Reducer
④ SQL语句调整
a ) 选用join key 分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表join的时候,数据量相对变小的效果。
b ) 大小表Join:使用map join让小的维度表(1000条以下的记录条数)先进内存。在Map端完成Reduce。c ) 大表Join大表:把空值的Key变成一个字符串加上一个随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终的结果。
d ) count distinct大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

65.分区表和分桶表各自的优点能介绍一下吗?

分区表
介绍
1、分区使用的是表外字段,需要指定字段类型

2、分区通过关键字 partitioned by(partition_name string) 声明

3、分区划分粒度较粗

优点
将数据按区域划分开,查询时不用扫描无关的数据,加快查询速度

分桶表
介绍
1、分桶使用的是表内字段,已经知道字段类型,不需要再指定。

2、分桶表通过关键字clustered by(column_name) into … buckets声明

3、分桶是更细粒度的划分、管理数据,可以对表进行先分区再分桶的划分策略

优点
用于数据取样;能够起到优化加速的作用

分桶逻辑:对分桶字段求哈希值,用哈希值与分桶的数量取余,余几,这个数据就放在那个桶内。

66.使用过Hive的视图和索引吗,简单介绍一下

Hive视图
视图是一种使用查询语句定义的「虚拟表」,是数据的一种「逻辑结构」,创建视图时不会把视图存储到磁盘上,定义视图的查询语句只有在执行视图的语句时才会被执行。

通过引入视图机制,可以简化查询逻辑,提高了用户效率与用户满意度。

 「注意:」视图是只读的,不能向视图中插入或是加载数据

Hive索引
和关系型数据库中的索引一样,Hive也支持在表中建立索引。适当的索引可以优化Hive查询数据的性能。但是索引需要额外的存储空间,因此在创建索引时需要考虑索引的必要性。

 「注意:」Hive不支持直接使用DROP TABLE语句删除索引表。如果创建索引的表被删除了,则其对应的索引和索引表也会被删除;如果表的某个分区被删除了,则该分区对应的分区索引也会被删除。

67.hive sql提交流程和各组件的作用

用户提交 SQL 语句:用户通过 Hive 客户端提交 SQL 语句到 Hive 服务器。

解析器(Parser):Hive 服务器接收到 SQL 语句后,由解析器对其进行语法分析和解析。

语法树(AST)生成:通过解析器生成语法树,即 AST(Abstract Syntax Tree)。

查询优化器(Optimizer):对语法树进行优化,如去除无用的表、列、分区等,以提高查询效率。

查询计划生成器(Plan Generator):将优化后的语法树生成执行计划,即 DAG(有向无环图)。

执行计划执行器(Execution Engine):执行计划主要包括 MapReduce、Tez 等执行引擎,在这里会将执行计划翻译成相应的 MapReduce 或 Tez 作业,并提交到 Hadoop 集群中执行。

68. hive的metastore服务是做什么,hiveserver服务做什么

metastore负责管理hive的元数据信息,并将元数据存储在关系型数据库中。Hive metastore 服务主要提供了以下功能:

元数据存储:将 Hive 的元数据信息存储到关系型数据库中,实现了 Hive 的元数据管理。

元数据访问:提供元数据查询、修改、删除等 API 接口。

权限控制:通过元数据信息来进行权限控制,限制用户对数据的访问权限。

数据类型管理:维护 Hive 支持的所有数据类型信息,以支持数据的转换和操作。

HiveServer 服务则是 Hive 的客户端程序和服务器程序之间的桥梁,它主要负责接收来自客户端的请求,将请求转换成 HQL 查询语句,并将查询结果返回给客户端。HiveServer 主要具备以下功能:

提供 SQL 接口:HiveServer 通过提供 SQL 接口,允许其他应用程序连接到 Hive 并通过 SQL 语句进行查询、更新数据等操作。

多用户并发处理:允许多个用户同时连接到 HiveServer,执行 Hive 查询。

安全认证:HiveServer 支持 Kerberos 认证,可以保证安全访问 Hive。

查询缓存:HiveServer 可以通过缓存查询结果来提高查询性能和减轻系统负载。

安全审计:记录每个请求的用户、执行时间、执行结果等信息,实现对数据操作的审计和监控。

69. hive语句执行慢,怎么排查?如何解决?

检查sql,是否存在不合理的情况

数据分区:是否有做分区裁剪,避免全表扫描浪费时间

列裁剪:不要使用select *或者不参与计算的字段

数据序列化方式:选择orc和parquet支持列式存储,提高计算效率

70. hive调优?hive的null值一定会发生数据倾斜吗?大小表join一定会发生数据倾斜吗?

hive调优:

调整MapReduce参数:提高map、reduce内存大小

数据分区:选择合理的key,让数据可以均匀分布到不同的分区

使用map join让小表加载到内存中,提高关联计算效率

对于 null 值是否一定会发生数据倾斜的问题,答案是否定的。如果某个字段的 null 值占比非常高,那么可能会导致数据倾斜,但并不是一定会出现数据倾斜的情况,具体还要根据数据分布情况来判断。

对于大小表 join 是否一定会发生数据倾斜的问题,答案也是是否定的。大小表 join 本身并不会导致数据倾斜,但当 join 的字段分布不均匀时,就会导致数据倾斜问题。因此,需要根据实际情况进行调整和处理。如可以尝试使用 Map Join、使用分区表等措施来优化,从而避免数据倾斜问题的发生

72.hive分区损坏,如何恢复

针对分区损坏的表,首先需要查看表的元数据,找出哪些分区存在问题。可以使用 describe 命令查看表的元数据信息。

对于损坏的分区,尝试使用修复工具进行修复。例如,可以使用 MSCK REPAIR TABLE 命令对整个表进行修复,或者使用 ALTER TABLE … RECOVER PARTITIONS 命令来修复指定的分区。

如果修复工具无法正常修复分区,则需要手动重建分区。根据分区的特点,可以使用如 HDFS 命令、Hive 命令等多种方式来手动重建分区。

在手动重建分区的过程中,需要注意分区的存储格式和数据源的一致性。确保重建后的分区和其他分区存储格式相同,并且数据源和其他分区一致。

重建分区后,需要执行元数据更新操作,使得 Hive 的元数据信息与实际情况一致。可以使用 msck repair table 命令或者 alter table … add partition…命令来更新元数据。

73.hive元数据放在哪里,HD集群hive等元数据放在哪里?

hive元数据默认是放在derby中,也可以通过修改参数放到其他关系型数据库中,比如mysql
HD集群hive元数据放在DBServer中

74.sql考试

数据准备
建表

create table dept (deptid int ,deptname string,address string);
create table emp(empid int ,empname string,salary DECIMAL ,ruzhidate date,mgr int,deptid int);

插入数据

insert into dept values(1,'研发部','北京'),(2,'人事部','上海'),(3,'销售部','深圳'),(4,'公关部','东莞');
insert into emp values(1001,'tom',18000,'2013-10-11',1005,1),(1002,'jerry',13000,'2021-11-11',1005,1),(1003,'jack',10000,'2020-09-11',1001,1),(1004,'rose',5000,'2020-10-11',1001,2),(1005,'bob',20000,'2018-08-11',null,2)

面试题1:统计全公司工资最高的三个人

select * from emp sort by salary desc limit 3 

面试题2:查出部门的员工数和部门的名称

select d.deptname,count(*) from
dept d join emp e on d.deptid=e.deptid
group by d.deptname

面试题3:统计员工最高的工资是多少,以及这个人是谁

方式一:

select e.empname,e.salary from 
emp e
join
(select max(salary) as maxsalary from emp) t1
on e.salary=t1.maxsalary;
方式二:
select * from emp where salary =(select max(salary) from emp);

面试题4:每个部门最高的工资,以及是谁

select e.empname,e.salary,e.deptid from 
emp  e
join
(select max(salary) as maxsalary ,deptid from emp group by deptid) t1
on e.deptid=t1.deptid and e.salary=t1.maxsalary

面试题5:部门平均工资大于公司总的平均工资的部门是哪个部门

select t1.avg ,t1.deptid from 
(select avg(salary) as avg ,deptid,'link' as link from emp group by deptid) t1
join
(select avg(salary) as totalavg ,'link' as link from emp ) t2
on t1.link=t2.link where  t1.avg>t2.totalavg;

面试题6:求哪个员工的工资大于本部门的平均工资

select * from 
(select avg(salary) as avg ,deptid from emp group by deptid) t1
join
emp t2
on t1.deptid=t2.deptid where t2.salary>t1.avg;

数据准备
现有用户请求数据

用户id(uid) string
请求时间(req_time) string
请求url(req_url) string
处理日期(day) string
处理日期:20220919
uid    req_time    req_url
u0001    20200918102030    /index.jsp
u0001    20200918102035    /goods.action
u0002    20200918102030    /brand.action
u0001    20200918102130    /index.jsp
u0002    20200918102230    /index.jsp

处理日期:20220920
uid    req_time    req_url
u0001    20200919102030    /index.jsp
u0003    20200919102035    /goods.action
u0004    20200919102030    /brand.action
u0001    20200919102130    /index.jsp
u0003    20200919102230    /index.jsp

用户数据如下:

用户id(uid)
用户名称(uname)
uid    Name
u0001    用户1
U0002    用户2
U0003    用户3

面试题7:根据给的用户请求数据,创建textfile表 分区表hainiu.request_text ,创建每天处理日期的分区,并导入数据

create table request_text (
uid string,
req_time string ,
req_url string
) 
partitioned by (day string) 
row format delimited fields terminated by '\t';

load data local inpath '/home/hadoop/19' into table request_text partition(day='20220919');
load data local inpath '/home/hadoop/20' into table request_text partition(day='20220920');

面试题8:创建parquet分区表 hainiu.request_p ,创建每天处理日期的分区,并将hainiu.request_text 表每天分区数据导入到 request_p 每天分区中

create table request_p (
 uid string,
 req_time string ,
 req_url string
 ) 
 partitioned by (day string) 
 stored as parquet;

普通方式
insert into table request_p partition(day='20220919') select * from request_text where day='20220919';
insert into table request_p partition(day='20220920') select * from request_text where day='20220920';

动态分区方式
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;  
insert into table request_p partition(day) select * from request_text;

数据准备
创建用户表 user_text表 存储格式是 字段两个,数据用已经提供的数据

u0001 用户1
u0002 用户2
u0003 用户3

面试题9:创建parquet分区表 request_p2, 将用户信息表和request_p表中的数据导入到request_p2表中,格式要符合要求
表结构:
用户id
用户名称
请求日期 需要格式化 只保留 20200919 年月日
请求url
处理日期

create table request_p2(
uid string,
uname string,
req_day string,
req_url string)
partitioned by (day string) 
stored as parquet;

//动态分区的方式进行查询导入
insert into table request_p2 partition(day)
select t1.uid,t2.uname,t1.req_day,t1.req_url,t1.day from 
(select uid,substring(req_time,1,8) as req_day,req_url,day from request_p) t1
join
user_text t2 on t1.uid=t2.uid; 

面试题10: 用 request_p2 表 统计请求日期是20220918 ,处理日期是 20220919 请求的用户数(去重),导出到linux /home/hadoop/export/output1.txt文件中

方式1:
insert overwrite directory '/home/hadoop/export/' select count(distinct(uid)) from request_p2 where req_day='20220918' and day='20220919';

方式2:
hive -e "use hainiu;set mapreduce.job.queuename=hainiu;select count(distinct(uid)) from request_p2 where req_day='20200918' and day='20220919';" > /home/hadoop/export/output1.txt

面试题11:用 request_p2 表查询出每个请求日期每个用户的访问量,导出到linux /home/hadoop/export/output2.txt文件中

hive -e "use hainiu;set mapreduce.job.queuename=hainiu;select req_day,uid ,count(*) from request_p2 group by req_day,uid;" > /home/hadoop/export/output2.txt

面试题12:用request_p2 表 计算20号比19号的增量用户,导出到linux /home/hadoop/export/output3.txt文件中

hive -e "use hainiu;set mapreduce.job.queuename=hainiu;select t2.uid from (select uid from request_p2 where day='20220919' group by uid) t1 right join (select uid from request_p2 where day='20220920' group by uid) t2 on t1.uid=t2.uid where t1.uid is null;" > /home/hadoop/export/output3.txt

面试题13:查询request_p 处理日期是 20220919-20220920,每个用户的最后请求记录 导出到linux /home/hadoop/export/output4.txt文件中

hive -e "use hainiu;set mapreduce.job.queuename=hainiu;select * from (select uid,req_time,req_url,row_number() over(partition by uid order by req_time desc ) as lastreq  from request_p) t1 where lastreq=1; " > /home/hadoop/export/output4.txt

75.Hive表关联查询,如何解决数据倾斜的问题?

倾斜原因:
map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特、建表时考虑不周、等原因造成的reduce 上的数据量差异过大。

key分布不均匀;
业务数据本身的特性;
建表时考虑不周;
某些SQL语句本身就有数据倾斜;

如何避免:
对于key为空产生的数据倾斜,可以对其赋予一个随机值。

解决方案
参数调节:
hive.map.aggr = true
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定位true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。
SQL 语句调节:
选用join key分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表做join 的时候,数据量相对变小的效果。
大小表Join:使用map join让小的维度表(1000 条以下的记录条数)先进内存。在map端完成reduce。
大表Join大表:把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null 值关联不上,处理后并不影响最终结果。
count distinct大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

76.hive死锁解决

前面遇到过一次因为Hive中表被锁住了,导致定时任务一直失败。这两天又出现了表被锁,原因是连接hiveserver2过于频繁,mysql连接被打满,引发的连锁反应,导致们的小时任务一直失败,下午重点注意到这个问题,才解决好。

Hive中的锁
在执行insert into或insert overwrite任务时,中途手动将程序停掉,会出现卡死情况(无法提交MapReduce),只能执行查询操作,而drop insert操作均不可操作,无论执行多久,都会保持卡死状态。

查看Hive的中死锁,可以使用show locks [table]来查看。

clipboard

可以看到里面的那个Type下的EXCLUSIVE,这是一种互斥锁,需要解决,否则后续的查询和插入任务都会影响。 hive存在两种锁,共享锁Shared (S)和互斥锁Exclusive (X)

S X
S
X

锁的基本机制是:

元信息和数据的变更需要互斥锁
数据的读取需要共享锁
触发共享锁的操作是可以并发执行的,但是触发互斥锁,那么该表和该分区就不能并发的执行作业了。

对于上面的情况,使用解锁命令:

unlock table tableName
注意:表锁和分区锁是两个不同的锁,对表解锁,对分区是无效的,分区需要单独解锁

解锁方法
查看表被锁的情况:

show locks tableName

常规解锁方法:

unlock table 表名;  -- 解锁表
unlock table 表名 partition(dt='2014-04-01');  -- 解锁某个分区

高版本hive默认插入数据时,不能查询,因为有锁 可能出现的问题

解锁之路通常不是一帆风顺的,可能会遇到各种问题,笔者是在Hive2.1.1下面测试,比如:

clipboard3

这个命令无法执行,说LockManager没有指定,这时候需要执行命令:

set hive.support.concurrency=true;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager;

这样重新执行,命令就可以执行了

如果还!是!不!行,终极方法,可以直接去mysql元数据执行:

select * from HIVE_LOCKS;

查到所有的锁,然后根据条件把对应的锁删掉,这个锁住的表即可释放出来了。

delete from HIVE_LOCKS where HL_DB = 'cdn' and HL_TABLE = 'cdn_log_format';
注意:表名和字段都需要大写。

通过这种办法,通常可以彻底解决锁的问题

77.Lag 和Lead的使用

Lag和Lead分析函数可以在同一次查询中取出同一字段的后N行的数据(Lag)和前N行的数据(Lead)作为独立的列。

这种操作可以代替表的自联接,并且LAG和LEAD有更高的效率,其中over()表示当前查询的结果集对象,括号里面的语句则表示对这个结果集进行处理。

1、LEAD

与LAG相反,LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值

参数1为列名,参数2为往下第n行(可选,默认为1),参数3为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL)

场景
用户Peter在浏览网页,在某个时刻,Peter点进了某个页面,过一段时间后,Peter又进入了另外一个页面,如此反复,那怎么去统计Peter在某个特定网页的停留时间呢,又或是怎么统计某个网页用户停留的总时间呢?

create table test.user_log(
    userid string,
    time string,
    url string
) row format delimited fields terminated by '\t';
使用load命令将如下测试数据导入:

Peter   2015-10-12 01:10:00 url1
Peter   2015-10-12 01:15:10 url2
Peter   2015-10-12 01:16:40 url3
Peter   2015-10-12 02:13:00 url4
Peter   2015-10-12 03:14:30 url5
Marry   2015-11-12 01:10:00 url1
Marry   2015-11-12 01:15:10 url2
Marry   2015-11-12 01:16:40 url3
Marry   2015-11-12 02:13:00 url4
Marry   2015-11-12 03:14:30 url5

数据说明:Peter 2015-10-12 01:10:00 url1 ,表示Peter在2015-10-12 01:10:00进入了网页url2,即记录的是进入网页的时间。

分析
要计算Peter在页面url1停留的时间,需要用进入页面url2的时间,减去进入url1的时间,即2015-10-12 01:15:10这个时间既是离开页面url1的时间,也是开始进入页面url2的时间。

获取用户在某个页面停留的起始与结束时间:

select userid,
time stime,
lead(time) over(partition by userid order by time) etime,
url 
from test.user_log;

stime就是进入页面时间,etime就是离开页面时间,结果是这样的:

Marry   2015-11-12 01:10:00 2015-11-12 01:15:10 url1
Marry   2015-11-12 01:15:10 2015-11-12 01:16:40 url2
Marry   2015-11-12 01:16:40 2015-11-12 02:13:00 url3
Marry   2015-11-12 02:13:00 2015-11-12 03:14:30 url4
Marry   2015-11-12 03:14:30 NULL    url5
Peter   2015-10-12 01:10:00 2015-10-12 01:15:10 url1
Peter   2015-10-12 01:15:10 2015-10-12 01:16:40 url2
Peter   2015-10-12 01:16:40 2015-10-12 02:13:00 url3
Peter   2015-10-12 02:13:00 2015-10-12 03:14:30 url4
Peter   2015-10-12 03:14:30 NULL    url5

用etime减去stime,然后按照用户分组累加就是,每个用户访问的总时间了。

select userid,
time stime,
lead(time) over(partition by userid order by time) etime,
UNIX_TIMESTAMP(lead(time) over(partition by userid order by time),'yyyy-MM-dd HH:mm:ss')- UNIX_TIMESTAMP(time,'yyyy-MM-dd HH:mm:ss') period,
url 
from test.user_log;

这里展示出了stime(开始时间),etime(离开时间),period(停留时长),url(页面地址),结果:

Marry   2015-11-12 01:10:00 2015-11-12 01:15:10 310 url1
Marry   2015-11-12 01:15:10 2015-11-12 01:16:40 90  url2
Marry   2015-11-12 01:16:40 2015-11-12 02:13:00 3380    url3
Marry   2015-11-12 02:13:00 2015-11-12 03:14:30 3690    url4
Marry   2015-11-12 03:14:30 NULL    NULL    url5
Peter   2015-10-12 01:10:00 2015-10-12 01:15:10 310 url1
Peter   2015-10-12 01:15:10 2015-10-12 01:16:40 90  url2
Peter   2015-10-12 01:16:40 2015-10-12 02:13:00 3380    url3
Peter   2015-10-12 02:13:00 2015-10-12 03:14:30 3690    url4
Peter   2015-10-12 03:14:30 NULL    NULL    url5

这里有空的情况,也就是没有获取到离开时间,这要看实际业务怎么定义了,如果算到23点,太长了。

2、Lag

LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL)可以用来做一些时间的维护,如上一次登录时间。

场景
用户Peter在浏览网页,在某个时刻,Peter点进了某个页面,过一段时间后,Peter又进入了另外一个页面,如此反复,那怎么去统计Peter在某个特定网页的停留时间呢,又或是怎么统计某个网页用户停留的总时间呢?

create table test.user_log(
    userid string,
    time string,
    url string
) row format delimited fields terminated by '\t';

使用load命令将如下测试数据导入:

Peter   2015-10-12 01:10:00 url1
Peter   2015-10-12 01:15:10 url2
Peter   2015-10-12 01:16:40 url3
Peter   2015-10-12 02:13:00 url4
Peter   2015-10-12 03:14:30 url5
Marry   2015-11-12 01:10:00 url1
Marry   2015-11-12 01:15:10 url2
Marry   2015-11-12 01:16:40 url3
Marry   2015-11-12 02:13:00 url4
Marry   2015-11-12 03:14:30 url5

数据说明:Peter 2015-10-12 01:10:00 url1 ,表示Peter在2015-10-12 01:10:00进入了网页url2,即记录的是进入网页的时间。

select userid,
time etime,
lag(time, 1, '1970-01-01 00:00:00') over(partition by userid order by time) stime,
url 
from test.user_log;

这里etime是结束时间,stime是开始时间,结果:

Marry   2015-11-12 01:10:00 1970-01-01 00:00:00 url1
Marry   2015-11-12 01:15:10 2015-11-12 01:10:00 url2
Marry   2015-11-12 01:16:40 2015-11-12 01:15:10 url3
Marry   2015-11-12 02:13:00 2015-11-12 01:16:40 url4
Marry   2015-11-12 03:14:30 2015-11-12 02:13:00 url5
Peter   2015-10-12 01:10:00 1970-01-01 00:00:00 url1
Peter   2015-10-12 01:15:10 2015-10-12 01:10:00 url2
Peter   2015-10-12 01:16:40 2015-10-12 01:15:10 url3
Peter   2015-10-12 02:13:00 2015-10-12 01:16:40 url4
Peter   2015-10-12 03:14:30 2015-10-12 02:13:00 url5

计算总时间,只需要用结束时间 - 开始时间,然后分组累加即可。

select userid,
UNIX_TIMESTAMP(time, 'yyyy-MM-dd HH:mm:ss') - 
UNIX_TIMESTAMP(lag(time, 1, '1970-01-01 00:00:00') over(partition by userid order by time), 'yyyy-MM-dd HH:mm:ss'),
url 
from test.user_log;

结果

Marry   1447290600  url1
Marry   310 url2
Marry   90  url3
Marry   3380    url4
Marry   3690    url5
Peter   1444612200  url1
Peter   310 url2
Peter   90  url3
Peter   3380    url4
Peter   3690    url5

因为有两个将默认值置为了1970-01-01,所以算出来比较大,实际工作中需要按照实际情况处理。

78.彻底解决Hive小文件问题

最近发现离线任务对一个增量Hive表的查询越来越慢,这引起了的注意,在cmd窗口手动执行count操作查询发现,速度确实很慢,才不到五千万的数据,居然需要300s,这显然是有问题的,推测可能是有小文件。

去hdfs目录查看了一下该目录:
发现确实有很多小文件,有480个小文件,觉得找到了问题所在,那么合并一下小文件吧:

insert into test select * from table distribute by floor (rand()*5);
这里使用distribute by进行了一个小文件的合并,通过rand() * 5,保证了从map端输出的数据,最多到5个reducer,将小文件数量控制了下来,现在只有3个文件了。

合并小文件后,再次做同样的查询,15s就完成了。确实忽略了,增量数据会导致小文件,应该在当初做的时候就做定时的小文件合并,而不是等到现在才发现。

因为这个表每天是有增量数据进去的,增量数据会单独生成一个文件,因为增量数据本身不大,日积月累就形成了大量小文件。不仅对namenode的内存造成压力,对map端的小文件合并也有很大压力。

小文件产生的原因
动态分区插入数据的时候,会产生大量的小文件;
数据源本身就包含有大量的小文件;
做增量导入,比如Sqoop数据导入,一些增量insert等;
分桶表,分桶表通常也会遇到小文件,本质上还是增量导入的问题;
可以修改的表,这种Hive表是可以进行修改的,通过配置stored as orc TBLPROPERTIES (“transactional”=”true”),这种表最坑,每天都会有一个快照,到后面10G大小的数据,表文件体积可以达到600G,时间越长越大;
小文件的问题有很多,实际中各种原因,由于自己的不小心,前期没有做好预防都会产生大量小文件,让线上的离线任务神不知鬼不觉,越跑越慢。

小文件的危害
给namenode内存中fsImage的合并造成压力,如果namenode内存使用完了,这个集群将不能再存储文件了;
虽然map阶段都设置了小文件合并,org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,太多小文件导致合并时间较长,查询缓慢;

小文件的解决方案
彻底解决小文件,分为了两个方向,一个是小文件的预防,一个是大量小文件问题已经出现了,们该怎么解决。

小文件的预防
网上有些解决方案,是调节参数,这些参数在使用的Hive2是默认都开启了的:

//每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;  
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)  
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000

有些公司用的版本不同,低版本可能有些配置不一样,最好检查一下上面这些配置是否设置,然后根据自己的实际集群情况进行设置。

小文件的预防,主要还是要根据小文件的产生原因,来进行预防。

动态分区插入的时候,保证有静态分区,不要误判导致产生大量分区,大量分区加起来,自然就有大量小文件;
如果源表是有大量小文件的,在导入数据到目标表的时候,如果只是insert into dis select * from origin的话,目标表通常也有很多小文件。如果有分区,比如dt, hour,可以使用distribute by dt, hour,保证每个小时的数据在一个reduce里面;
类似sqoop增量导入,还有hive一些表的查询增量导入,这些肯定是有小文件的,需要进行一周甚至一天定时任务的小文件合并。

小文件的解决
上面是平时开发数据任务时候,小文件的预防,但如果由于们的大意,小文件问题已经产生了,就需要解决了。通常就是insert overwrite了。

insert overwrite table test [partition(hour=…)] select * from test distribute by floor (rand()*5);
注:这个语句把test表的数据查询出来,overwrite覆盖test表,不用担心如果overwrite失败,数据没了,这里面是有事物性保证的,可以观察一下执行的时候,在test表hdfs文件目录下面有个临时文件夹。如果是分区表,加上partition,表示对该分区进行overwrite。

如果是orc格式存储的表,还可以使用alter table test [partition(…)] concatenate进行小文件的合并,不过这种方法仅仅适用于orc格式存储的表。 猜你喜欢 Hadoop3数据容错技术(纠删码)

79.常见函数使用

1. 数据脱敏函数

hive 有专门的脱敏函数供我们使用,就是mask()函数,返回值是 string 类型,默认需要脱敏的数据中大写字母就自动转换为 X,小写字母就自动转换为 x,数字就自动转换为 n,也可通过 mask()函数的参数来自定义转换格式。注意:入参也必须是string类型才不会有隐藏bug

select mask(要加密字段) from 表名                     -- 输出默认脱敏后的结果
select mask(要加密字段,'X','x','#') from 表名         -- 输出自定义脱敏后的结果
select mask_first_n(要加密的字段,n) from 表名         -- 对前n个字符进行脱敏
select mask_last_n(要加密的字段,n) from 表名          -- 对后n个字符进行脱敏
select mask_show_first_n(要加密的字段,n) from 表名    -- 对除了前n个字符之外的字符进行脱敏
select mask_show_last_n(要加密的字段,n) from 表名     -- 对除了后n个字符之外的字符进行脱敏
select mask_hash(字段) from 表名                     -- 对字段进行hash操作,若是非string类型的字段此函数就返回null

2. hive 获取当天时间

– PS:hive3版本对时间函数unix_timestamp()from_unixtime()做了重写,需要加8小时或者减8小时,结果才正确

select current_date -- 2022-06-19
select from_unixtime(unix_timestamp() + 8*3600) --  2022-06-19 15:30:54

3. hive 格式化时间数据

select from_unixtime(unix_timestamp() + 8*3600,'yyyy-MM') -- 2022-06
select date_format(from_unixtime(unix_timestamp()),'yyyy-MM') -- 2022-06

4. hive 获取本月第一天,本年第一天,上个月第一天,本月最后一天,下个月第一天等指标

select trunc(from_unixtime(unix_timestamp() + 8*3600),'MM') -- 2022-06-01
select trunc(from_unixtime(unix_timestamp() + 8*3600),'YEAR'); -- 2022-01-01
select trunc(add_months(from_unixtime(unix_timestamp() + 8*3600),-1),'MM') -- 2022-05-01
select last_day(from_unixtime(unix_timestamp() + 8*3600)) -- 2022-06-30
select trunc(add_months(from_unixtime(unix_timestamp() + 8*3600),1),'MM') -- 2022-07-01

5. datediff 日期比较函数第一个参数是结束日期,第二个是开始日期,返回结束日期减开始日期

select datediff('2020-07-05','2020-06-15'); -- 返回20,注意日期格式认准- ,如果是/则无效,得使用格式转换

6. hive 对 yyyy/MM/dd 格式的日期和 yyyy-MM-dd 格式的日期相互转换方案

第一种是通过from_unixtime()+unix_timestamp()转换时间戳方式转换

第二种是通过concat()+substr()拼接截取方式转换,

第三种是通过regexp_replace()正则匹配方式去掉横杠。

select 
     '2022/08/09' as source_text
    ,from_unixtime(unix_timestamp('2022/08/09','yyyy/MM/dd'),'yyyy-MM-dd') as func_text_1 -- 方案一
    ,concat(substr('2022/08/09',1,4),'-',substr('2022/08/09',6,2),'-',substr('2022/08/09',9,2)) as func_text_2 -- 方案二
    ,regexp_replace('2022/08/09','/','-') as func_text_3 -- 方案三

7. hive 的多行转多列 (重点)

方案一:利用拼接的方式构造map类型

方案二:利用if判断表达式+聚合收敛

-- 方案一,利用拼接的方式构造map类型
select stat_date
    ,event_list['test1'] as test1_cnt
    ,event_list['test2'] as test2_cnt
from 
(
    select 
         stat_date
        ,str_to_map(concat_ws(',',collect_list(concat_ws(':',event_name,cast(event_cnt as string))))) as event_list
    from
    (
        select 
             stat_date
            ,event_name
            ,count(1) as event_cnt
        from 表名
        where stat_date between 20220801 and 20220810
        and event_name in('test1','test2')
        group by stat_date 
                ,event_name
    ) s 
    group by stat_date
) w 

-- 方案二,利用if判断表达式
select 
     stat_date
    ,sum(if(event_name='test1',event_cnt,0)) as test1_cnt
    ,sum(if(event_name='test2',event_cnt,0)) as test2_cnt
from 
(
    select 
         stat_date
        ,event_name
        ,count(1) as event_cnt
    from 表名
    where stat_date between 20220801 and 20220810
    and event_name in('test1','test2')
    group by stat_date 
            ,event_name
) s 
group by stat_date

8. hive 查找数组内是否包含某个元素

select array_contains(array<int>,某元素); 注意:array_contains()函数支持 int 数组或者 string 数组,不支持 bigint 数据类型的数组。

9. hive 字符串数组类型的数据转为字符串数据

select concat_ws(',',array<string>);

10. hive 的空处理函数

coalesce(数据字段,’自定义值’) select coalesce(aaa,’空值清洗’)

80、介绍一下 Order By,Sort By,Distrbute By,Cluster By的区别

Order By(全局排序)
order by 会对输入做全局排序,因此只有一个reduce(多个reducer无法保证全局有序),也正因为只有一个 reducer,所以当输入的数据规模较大时,会导致计算的时间较长。

注意: Order by 和 数据库中的 Order by 功能一致,按照某一个或者字段排序输出。与数据库中 order by的区别在于在 hive 的严格模式下(hive.mapred.mode = strict)下,必须指定 limit ,否则执行会报错!

Sort By(每个MapReduce排序)
sort by并不是全局排序,其在数据进入reducer前完成排序。因此,如果用sort by进行排序,并且设置 mapred.reduce.tasks>1, 则sort by只保证每个reducer的输出有序,不保证全局有序。

拓展: ①sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响 ②sort by 的数据只能保证在同一reduce中的数据可以按指定字段排序 ③使用sort by 你可以指定执行的reduce 个数 (set mapred.reduce.tasks=),对输出的数据再执行归并排序,即可以得到全部结果 注意: 可以用 limit 子句大大减少数据量。使用 limit n 后,传输到 reduce 端(单机)的数据记录数就减少到 n* (map个数)。否则由于数据过大可能出不了结果。

Distrbute By(每个分区排序)
在有些情况下,我们需要控制某个特定行应该到哪个 reducer ,通常是为了进行后续的聚集操作。distribute by 子句可以做这件事。distribute by类似 MR 中 partition(自定义分区),进行分区,结合 sort by 使用。distribute by 和 sort by 的常见使用场景有:

Map输出的文件大小不均
Reduce输出文件不均
小文件过多
文件超大
Cluster By
当 distribute by 和 sorts by字段相同时,可以使用 cluster by 方式代替。cluster by除了具有 distribute by 的功能外还兼具 sort by 的功能。但是排序只能是 升序 排序,不能像distribute by 一样去指定排序的规则为 ASC 或者 DESC 。

81、HiveSQL 语句中 select from where group by having order by 的执行顺序

    平时没有仔细研究过,这题还真不好猜。

    实际上,在 hive 和 mysql 中都可以通过 explain+sql 语句,来查看执行顺序。对于一条标准 sql 语句,它的书写顺序是这样的:

    selectfromwheregroup byhavingorder bylimit

(1)mysql 语句执行顺序:

    from... where...group by... having.... select ... order by... limit …

(2)hive 语句执行顺序:

    from … where … selectgroup byhavingorder bylimit

拓展: 要搞清楚面试官问执行顺序背后的原因是什么,不是单纯的看你有没有背过这道题,而是看你是否能够根据执行顺序,写出不被人喷的 SQL
根据执行顺序,我们平时编写时需要记住以下几点:

使用分区剪裁、列剪裁,分区一定要加
少用 COUNT DISTINCT,group by 代替 distinct
是否存在多对多的关联
连接表时使用相同的关键词,这样只会产生一个 job
减少每个阶段的数据量,只选出需要的,在 join 表前就进行过滤
大表放后面
谓词下推:where 谓词逻辑都尽可能提前执行,减少下游处理的数据量
sort by 代替 order by

81、如何做 Hive优化

只要你是老司机,多面试几趟,你就会发现常用的组件,中大型公司面试基本都会问到你如何对其调优。这个我正好之前总结过,大家可以看下:

MapJoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

行列过滤
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。

    行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

合理设置Map数
是不是map数越多越好?

    答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费 。而且,同时可执行的map数是受限的。此时我们就应该减少map数量。

合理设置Reduce数
Reduce个数并不是越多越好

    (1)过多的启动和初始化Reduce也会消耗时间和资源;         (2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

    在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;

严格模式
严格模式下,会有以下特点:

    ①对于分区表,用户不允许扫描所有分区

    ②使用了order by语句的查询,要求必须使用limit语句

    ③限制笛卡尔积的查询

开启map端combiner(不影响最终业务逻辑)
这个就属于配置层面上的优化了,需要我们手动开启 set hive.map.aggr=true;

压缩(选择快的)
设置map端输出中间结、果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率)

小文件进行合并
在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

其他
列式存储,采用分区技术,开启JVM重用…类似的技术非常多,大家选择一些方便记忆的就足以在面试时回答这道题。

82、Hive如何避免小文件的产生,你会如何处理大量小文件?

关于小文件如何处理,也已经是老生常谈的问题。

小文件产生的原因有很多,例如:读取数据源时的大量小文件,使用动态分区插入数据时产生,Reduce/Task数量较多。

我们都知道,HDFS文件元数据存储在 NameNode 的内存中,在 内存空间有限的情况下,小文件过多会影响NameNode 的寿命,同时影响计算引擎的任务数量,比如每个小的文件都会生成一个Map任务。

那到底该如何解决小文件过多的问题呢?

解决的方法有:

    (1)合并小文件:对小文件进行归档(Har)、自定义Inputformat将小文件存储成SequenceFile文件。

    (2)采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景。

    (3)对于大量小文件Job,可以开启JVM重用。

    (4)当然,也可以直接设置相关的参数
        设置map输入的小文件合并:
        set mapped. max split size=256000000
        //一个节点上 split的至少的大小〔这个值决定了多个 DataNode上的文件是否需要合并
        set mapred. in split. size per.node=100000000
        //一个交换机下 split的至少的大小〔这个值决定了多个交换机上的文件是否需要合并
        /执行Map前进行小文件合井
        set hive. input format=org. apache hadoop. hive. ql.io.CombineHiveInputFormat:

        设置 map 输出 和 reduce 输出 进行合并的相关参数
        //设置map端输出进行合并,默认为true
        set hive. merge mapfiles =true
        //设置 reduce端输出进行合并,默认为 false
        set hive. merge. mapredfiles=true
        //设置合并文件的大小
        set hive. merge.size.per.task =256*1000*1000
        //当输出文件的平均大小小于该值时,启动一个独立的 MapReduce任务进行文件 merge
        set hive.merge.smallfiles.avgsize= 16000000

http://www.hainiubl.com/topics/76159
http://www.hainiubl.com/topics/76158
http://www.hainiubl.com/topics/76157
http://www.hainiubl.com/topics/76160

作者:admin  创建时间:2023-06-27 22:16
最后编辑:admin  更新时间:2024-04-07 15:40