Hadoop 大数据仓库 Hive:自动将 SQL 生成 MapReduce 代码。

1. Hive简介

Hive 是一个基于 Hadoop 文件系统之上的数据仓库架构。它为数据仓库的管理提供了许多功能:数据 ETL (抽取、转换和加载)工具、数据存储管理和大型数据集的查询和分析能力。同时 Hive 还定义了类 SQL的语言 – Hive QL. Hive QL 允许用户进行和 SQL 相似的操作,它可以将结构化的数据文件映射为一张数据库表,并提供简单的 SQL 查询功能。还允许开发人员方便地使用 Mapper 和 Reducer 操作,可以将 SQL 语句转换为 MapReduce 任务运行,这对 MapReduce 框架来说是一个强有力的支持。

Hadoop Ecosystem:

img

Hive和传统关系数据库的区别

Hive 在很多方面与传统关系数据库类似(例如支持 SQL 接口),但是其底层对 HDFS 和 MapReduce 的依赖意味着它的体系结构有别于传统关系数据库,而这些区别又影响着 Hive 所支持的特性,进而影响着 Hive 的使用。

我们可以列举一些简单区别:

  • Hive 和关系数据库存储文件的系统不同,Hive 使用的是 Hadoop 的HDFS(Hadoop的分布式文件系统),关系数据库则是服务器本地的文件系统;
  • Hive 使用的计算模型是 MapReduce,而关系数据库则是自己设计的计算模型;
  • 关系数据库都是为实时查询的业务进行设计的,而 Hive 则是为海量数据做数据挖掘设计的,实时性很差;实时性的区别导致 Hive 的应用场景和关系数据库有很大的不同;
  • Hive 很容易扩展自己的存储能力和计算能力,这个是继承 Hadoop 的,而关系数据库在这个方面要差很多。

Hive 构建在基于静态批处理的 Hadoop 之上,Hadoop 通常都有较高的延迟并且在作业提交和调度的时候需要大量的开销。因此,Hive 不适合在大规模数据集上实现低延迟快速的查询。

Hive架构图:

基本组成

Hive 并不适合那些需要低延迟的应用,例如,联机事务处理(OLTP)。Hive 查询操作过程严格遵守 Hadoop MapReduce 的作业执行模型,Hive 将用户的 HiveQL 语句通过解释器转换为 MapReduce 作业提交到 Hadoop 集群上,Hadoop 监控作业执行过程,然后返回作业执行结果给用户。Hive 并非为联机事务处理而设计,Hive 并不提供实时的查询和基于行级的数据更新操作。

Hive 的最佳使用场合是大数据集的批处理作业,例如,网络日志分析。

Hive 的数据存储是建立在 Hadoop 文件系统之上的。Hive 本身没有专门的数据存储格式,也不能为数据建立索引,因此用户可以非常自由地组织 Hive 中的表,只需要在创建表的时候告诉 Hive 数据中的列分隔符就可以解析数据了。

Hive 中主要包括 4 种数据模型:表(Table)外部表(External Table)分区(Partition)以及 桶(Bucket)

Hive 的表和数据库中的表在概念上没有什么本质区别,在 Hive 中每个表都有一个对应的存储目录。而外部表指向已经在 HDFS 中存在的数据,也可以创建分区。Hive 中的每个分区都对应数据库中相应分区列的一个索引,但是其对分区的组织方式和传统关系数据库不同。桶在指定列进行 Hash 计算时,会根据哈希值切分数据,使每个桶对应一个文件。

由于 Hive 的元数据可能要面临不断地更新、修改和读取操作,所以它显然不适合使用 Hadoop 文件系统进行存储。目前 Hive 把元数据存储在 RDBMS 中,比如存储在 MySQL, Derby 中。

2. Hive基本操作

2.1 数据定义 - DDL 2.2 数据操作 - DML 2.3 数据查询 - DQL

2.1 数据定义 - DDL

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
  [(col_name data_type [COMMENT col_comment], ...)] 
  [COMMENT table_comment] 
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
  [CLUSTERED BY (col_name, col_name, ...) 
  [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
  [ROW FORMAT row_format] 
  [STORED AS file_format] 
  [LOCATION hdfs_path]

上面的一些关键字解释:

建表(CREATE)

如果没有LOCATION,Hive将在HDFS上的/user/hive/warehouse文件夹下以外部表的表名创建一个文件夹,并将属于这个表的数据存放在这里。

为了避免Hive在查询时扫描全表,增加没有必要的消耗,因此在建表时加入partition。

hive> CREATE TABLE test3(
        id             INT,
        email          STRING,
        name           STRING
        )
        PARTITIONED BY(sign_date STRING,age INT);

使用了sign_date 和age 两个字段作为分区列。但是,我们必须先创建这两个分区,才能够使用。

hive> ALTER TABLE test3 add partition(sign_date='20190823',age=20);

Hive中的table可以拆分成partiton,table和partition又可以进一步通过CLUSTERED BY分成更小的文件bucket,这样使得多个文件可以在map上同时启动。 首先需要设置环境变量 hive>set hive.enforce.bucketing = true;

hive> CREATE TABLE test4(
        id             INT,
        email          STRING,
        name           STRING,
        age            INT
        )
        PARTITIONED BY(sign_date STRING)
        CLUSTERED BY(id)SORTED BY(age) INTO 5 BUCKETS
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

2.2 数据操作 - DML

2.3 数据查询 - DQL

(1)基本的 Select 操作

SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM table_reference
    [WHERE where_condition]
    [GROUP BY col_list]
    [   CLUSTER BY col_list
      | [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list]
    ]
    [LIMIT number]

(2)基于 Partition 的查询

一般 SELECT 查询会扫描整个表,使用 PARTITIONED BY 子句建表,查询就可以利用分区剪枝(input pruning)的特性。 Hive 当前的分区剪枝,只有分区断言出现在离 FROM 子句最近的那个 WHERE 子句中,才会启用分区剪枝。

下面是两个例子page_view根据date分区:

SELECT page_views.*
FROM page_views
WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31'
SELECT page_views.* 
FROM page_views JOIN dim_users
ON (page_views.user_id = dim_users.id AND page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31')

(3)HAVING 查询

Hive在0.7.0版本中添加了对HAVING语句的支持,在旧版本的Hive中,使用一个子查询也可以实现相同的效果。

SELECT col1 FROM t1 GROUP BY col1 HAVING SUM(col2) > 10

等价于

SELECT col1 FROM (SELECT col1, SUM(col2) AS col2sum FROM t1 GROUP BY col1) t2 WHERE t2.col2sum > 10

(4)Join 查询

Join 的语法如下:

join_table:
    table_reference JOIN table_factor [join_condition]
  | table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition
  | table_reference LEFT SEMI JOIN table_reference join_condition
  | table_reference CROSS JOIN table_reference [join_condition] (as of Hive 0.10)

table_reference:
    table_factor
  | join_table

table_factor:
    tbl_name [alias]
  | table_subquery alias
  | ( table_references )

join_condition:
    ON equality_expression ( AND equality_expression )*

equality_expression:
    expression = expression

hive 只支持等连接(equality joins)、外连接(outer joins)、左半连接(left semi joins)。hive 不支持非相等的 join 条件,因为它很难在 map/reduce job 中实现这样的条件。而且,hive 可以 join 两个以上的表。

3. Hive例子

通过hive实现单词统计。新建一个word.txt文档,作为我们的数据文件:

vi word.txt

输入:

hello world
hello China
hello what
hello from China
hello why

创建一个数据表,并将word.txt内容导入。

# 新建表:
create table textline string;
# 载入数据:
load data local inpath '/home/hadoop/word.txt' into table text; 

注意:使用load函数时,如果从本地加载到hive表中,需要使用关键字local,它会将文件同时上传到hdfs系统中。

查看text表:

select *from text;

由于一行文本有多个单词,所以我们需要将每行的文本切割成单个的单词,可使用split函数:

select split(line,' ') from text;

每行文本已经被切割开来,但是得到的是数组类型,并不是hive能直接通过group by处理的形式,所以我们需要使用hive的另一个高级函数explode

explode这个函数的功能是行转列(俗称炸裂),也就是说将上面我们得到的数组中的每个元素生成一行。

命令如下:

select explode(split(line,' ')) from text;

对于我们炸裂出来的数据,原来的列的名称已经不再适用,我们将其取别名为word:

select explode(split(line,' '))as word from text;

接下来,我们就需要使用group by来对我们得到的炸裂开来的数据进行统计。我们需要将上面得到的结果作为另一张表t(子查询),然后对这张表进行统计,否则将会报错:

select t.word,count(*) from (select explode(split(line,' '))as word from text) as t group by t.word;

接下来我们将所有单词按照降序排列,同时只打印前三个单词。

select t.word,count(*) c from (select explode(split(line,' '))as word from text) as t group by t.word order by c desc limit 3; 

order by 默认升序,我们使用desc设置为降序,同时我们安装count(*)出来的结果作为排序依据。

在实际生产环境中,我们都需要将我们查询得到的结果存入另一张表中,以供其他人使用:

create table wordcount as select t.word,count(*) c from (select explode(split(line,' '))as word from text) as t group by t.word order by c desc limit 3; 

查看wordcount表:

desc wordcount;
select *from wordcount;

REFERENCE

baike-hive(数据仓库工具)

Hadoop Hive sql语法详解

hadoop-ecosystem