Hadoop(四):Hive

本文为学习笔记,对应视频教程来自黑马程序员Hive教程

Hive函数入门

函数概述

1
2
3
4
5
6
-- 显示所有的函数和运算符
SHOW FUNCTIONS;
-- 查看运算符或者函数的使用说明
DESCRIBE FUNCTION avg;
-- 使用extended 可以查看更加详细的使用说明
DESCRIBE FUNCTION EXTENDED avg;

内置函数

String Functions 字符串函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
-- 字符串长度函数:length(str | binary)
SELECT length("Facebook");
-- 字符串反转函数:reverse
SELECT reverse("Facebook");

-- 字符串连接函数:concat(str1, str2, ... strN)
SELECT concat("abc","def");

-- 带分隔符字符串连接函数:concat_ws(separator, [string | array(string)]+)
SELECT concat_ws('.', 'www', array('facebook', 'com'));

-- 字符串截取函数:substr(str, pos[, len]) 或者 substring(str, pos[, len])
-- pos是从1开始的索引,如果为负数则倒着数
SELECT substr("Facebook",-2);
SELECT substr("Facebook",2,2);

-- 字符串转大写函数:upper,ucase
SELECT upper("Facebook");
SELECT ucase("Facebook");

-- 字符串转小写函数:lower,lcase
SELECT lower("Facebook");
SELECT lcase("Facebook");

-- 去空格函数:trim 去除左右两边的空格
SELECT trim(" facebook ");

-- 左边去空格函数:ltrim
SELECT ltrim(" facebook ");

-- 右边去空格函数:rtrim
SELECT rtrim(" facebook ");

-- 正则表达式替换函数:regexp_replace(str, regexp, rep)
SELECT regexp_replace('100-200', '(\\d+)', 'num');

-- 正则表达式解析函数:regexp_extract(str, regexp[, idx]) 提取正则匹配到的指定组内容
SELECT regexp_extract('100-200', '(\\d+)-(\\d+)', 2);

-- URL解析函数:parse_url 注意要想一次解析出多个 可以使用parse_url_tuple这个UDTF函数
SELECT parse_url('http://www.facebook.cn/path/p1.php?query=1', 'HOST');

-- json解析函数:get_json_object
-- $代表当前JSON对象
SELECT get_json_object(
'[{"website":"www.itcast.cn","name":"allenwoon"}, {"website":"cloud.itcast.com","name":"carbondata 中文文档"}]',
'$.[1].website');

-- 空格字符串函数:space(n) 返回指定个数空格
SELECT space(2);

-- 重复字符串函数:repeat(str, n) 重复str字符串n次
SELECT repeat("123", 2);

-- 返回首字符ascii函数:ascii
-- a对应ASCII 97
SELECT ascii("apple");

-- 左补足函数:lpad
-- ???hi
SELECT lpad('hi', 5, '??');
-- h
SELECT lpad('hi', 1, '??');

-- 右补足函数:rpad
SELECT rpad('hi', 5, '??');

-- 分割字符串函数: split(str, regex)
SELECT split('oneAtwoBthreeC', '[ABC]');

-- 集合查找函数: find_in_set(str,str_array)
SELECT find_in_set('ab', 'abc,b,ab,c,def');
Date Functions 日期函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
-- 获取当前日期: current_date
SELECT current_date();

-- 获取当前时间戳: current_timestamp
-- 同一查询中对current_timestamp的所有调用均返回相同的值。
SELECT current_timestamp();

-- 获取当前UNIX时间戳函数: unix_timestamp
SELECT unix_timestamp();

-- UNIX时间戳转日期函数: from_unixtime
SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss');

-- 日期转UNIX时间戳函数: unix_timestamp
SELECT unix_timestamp("2011-12-07 13:01:03");

-- 指定格式日期转UNIX时间戳函数: unix_timestamp
SELECT unix_timestamp('20111207 13:01:03','yyyyMMdd HH:mm:ss');

-- 抽取日期函数: to_date
SELECT to_date('2009-07-30 04:17:52');

-- 日期转年函数: year
SELECT year('2009-07-30 04:17:52');

-- 日期转月函数: month
SELECT month('2009-07-30 04:17:52');

-- 日期转天函数: day
SELECT day('2009-07-30 04:17:52');

-- 日期转小时函数: hour
SELECT hour('2009-07-30 04:17:52');

-- 日期转分钟函数: minute
SELECT minute('2009-07-30 04:17:52');

-- 日期转秒函数: second
SELECT second('2009-07-30 04:17:52');

-- 日期转周函数: weekofyear 返回指定日期所示年份第几周
SELECT weekofyear('2009-07-30 04:17:52');

-- 日期比较函数: datediff 日期格式要求'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd'
SELECT datediff('2012-12-08','2012-05-09');

-- 日期增加函数: date_add
SELECT date_add('2012-02-28',10);

-- 日期减少函数: date_sub
SELECT date_sub('2012-01-1',10);
Mathematical Functions 数学函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 取整函数: round  返回double类型的整数值部分 (遵循四舍五入)
SELECT round(3.1415926);

-- 指定精度取整函数: round(double a, int d) 返回指定精度d的double类型
SELECT round(3.1415926,4);

-- 向下取整函数: floor
SELECT floor(3.1415926);
SELECT floor(-3.1415926);

-- 向上取整函数: ceil
SELECT ceil(3.1415926);
SELECT ceil(-3.1415926);

-- 取随机数函数: rand 每次执行都不一样 返回一个0到1范围内的随机数
SELECT rand();

-- 指定种子取随机数函数: rand(int seed) 得到一个稳定的随机数序列
SELECT rand(100);

-- 二进制函数: bin(BIGINT a)
SELECT bin(18);

-- 进制转换函数: conv(BIGINT num, int from_base, int to_base)
SELECT conv(17,10,16);

-- 绝对值函数: abs
SELECT abs(-3.9);
Collection Functions 集合函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 集合元素size函数: size(Map<K.V>) size(Array<T>)
SELECT size(`array`(11,22,33));
SELECT size(`map`("id",10086,"name","zhangsan","age",18));

-- 取map集合keys函数: map_keys(Map<K.V>)
SELECT map_keys(`map`("id",10086,"name","zhangsan","age",18));

-- 取map集合values函数: map_values(Map<K.V>)
SELECT map_values(`map`("id",10086,"name","zhangsan","age",18));

-- 判断数组是否包含指定元素: array_contains(Array<T>, value)
SELECT array_contains(`array`(11,22,33),11);
SELECT array_contains(`array`(11,22,33),66);

-- 数组排序函数:sort_array(Array<T>)
SELECT sort_array(`array`(12,2,32));
Conditional Functions 条件函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 使用之前课程创建好的t_student表数据
SELECT * FROM t_student LIMIT 3;

-- if条件判断: if(boolean testCondition, T valueTrue, T valueFalseOrNull)
SELECT if(1=2,100,200);
SELECT if(sex ='男','M','W') from t_student limit 3;

-- 空判断函数: isnull( a )
SELECT isnull("allen");
SELECT isnull(null);

-- 非空判断函数: isnotnull ( a )
SELECT isnotnull("allen");
SELECT isnotnull(null);

-- 空值转换函数: nvl(T value, T default_value)
SELECT nvl("allen","itcast");
SELECT nvl(null,"itcast");

-- 非空查找函数: COALESCE(T v1, T v2, ...)
-- 返回参数中的第一个非空值;如果所有值都为NULL,那么返回NULL
SELECT COALESCE(null,11,22,33);
SELECT COALESCE(null,null,null,33);
SELECT COALESCE(null,null,null);

--条件转换函数: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
SELECT CASE 100 WHEN 50 THEN 'tom' WHEN 100 THEN 'mary' ELSE 'tim' END ;
SELECT CASE sex WHEN '男' THEN 'man' ELSE 'women' END FROM t_student LIMIT 3;

-- nullif( a, b ):
-- 如果 a = b,则返回NULL;否则返回NULL。否则返回一个
SELECT nullif(11,11);
SELECT nullif(11,12);

-- assert_true(condition)
-- 如果'condition'不为真,则引发异常,否则返回null
SELECT assert_true(11 >= 0);
SELECT assert_true(-1 >= 0);
Type Conversion Functions 类型转换函数
1
2
3
-- 任意数据类型之间转换:cast
SELECT cast(12.14 as bigint);
SELECT cast(12.14 as string);
Data Masking Functions 数据脱敏函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- mask
-- 将查询回的数据,大写字母转换为X,小写字母转换为x,数字转换为n。
SELECT mask("abc123DEF");
SELECT mask("abc123DEF",'-','.','^'); --自定义替换的字母

-- mask_first_n(string str[, int n]
-- 对前n个进行脱敏替换
SELECT mask_first_n("abc123DEF",4);

-- mask_last_n(string str[, int n])
SELECT mask_last_n("abc123DEF",4);

-- mask_show_first_n(string str[, int n])
-- 除了前n个字符,其余进行掩码处理
SELECT mask_show_first_n("abc123DEF",4);

--mask_show_last_n(string str[, int n])
SELECT mask_show_last_n("abc123DEF",4);

--mask_hash(string|char|varchar str)
--返回字符串的hash编码。
SELECT mask_hash("abc123DEF");
其他函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- hive调用java方法: java_method(class, method[, arg1[, arg2..]])
SELECT java_method("java.lang.Math","max",11,22);

-- 反射函数: reflect(class, method[, arg1[, arg2..]])
SELECT reflect("java.lang.Math","max",11,22);

-- 取哈希值函数:hash
SELECT hash("allen");

--current_user()、logged_in_user()、current_database()、version()
SELECT logged_in_user();

-- SHA-1加密: sha1(string/binary)
SELECT sha1("allen");

-- SHA-2家族算法加密:sha2(string/binary, int) (SHA-224, SHA-256, SHA-384, SHA-512)
SELECT sha2("allen",224);
SELECT sha2("allen",512);

-- crc32加密:
SELECT crc32("allen");

-- MD5加密: md5(string/binary)
SELECT md5("allen");

用户自定义函数

UDF 普通函数

UDF函数通常把它叫做普通函数,最大的特点是一进一出,也就是输入一行输出一行。

UDAF 聚合函数

UDAF函数通常把它叫做聚合函数,A所代表的单词就是Aggregation聚合的意思。最大的特点是多进一出,也就是输入多行输出一行。

UDTF 表生成函数

UDTF函数通常把它叫做表生成函数,T所代表的单词是Table-Generating表生成的意思。最大的特点是一进多出,也就是输入一行输出多行。

案例:用户自定义UDF

需求描述

自定义开发实现Hive函数,对手机号机号中间4位进行****处理。

  1. 能够对输入数据进行非空判断、位数判断处理
  2. 能够实现校验手机号格式,把满足规则的进行处理
  3. 对于不符合手机号规则的数据原封不动不处理
实践
  1. 开发环境准备

    创建Maven工程,添加下述pom依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    <!-- 导入 hive 对应版本的依赖 -->
    <dependencies>
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.3</version>
    </dependency>
    </dependencies>
    <!-- 打包插件 -->
    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.2</version>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <filters>
    <filter>
    <artifact>*:*</artifact>
    <excludes>
    <exclude>META-INF/*.SF</exclude>
    <exclude>META-INF/*.DSA</exclude>
    <exclude>META-INF/*.RSA</exclude>
    </excludes>
    </filter>
    </filters>
    </configuration>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>
  2. 编写处理逻辑代码

    写一个java类,继承UDF,并重载evaluate方法;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    package cn.itcast.hive.udf;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorConverter;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;

    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    /**
    * 当Hive解析query时,会得到传入UDF参数的参数类型,并调用initialize()方法。
    * 针对该UDF的每个参数该方法都会收到一个对应的ObjectInspector参数,且该方法必须返回一个ObjectInspector表示返回值类型。
    * 通过调用该方法,Hive知道该UDF将返回什么数据类型,因此可以继续解析query。
    * 对于Hive的每行记录,我们在initialize()方法内读取ObjectInspector参数,并执行传参的数量和数据类型的检查,正确时才进行计算;
    * 在evaluate()方法中,我们使用initialize()方法里收到的ObjectInspector去读evaluate()方法接收的参数,
    * 即一串泛型Object(实际是DeferredObject),ObjectInspector解析Object并转成具体类型的对象执行数据处理,最后输出结果。
    */
    public class EncryptPhoneNumber extends GenericUDF {
    private String udfName = "ENCRYPT_PHONUM";
    private transient PrimitiveObjectInspectorConverter.TextConverter converter;
    private Text result = new Text();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    if (arguments.length != 1) {
    throw new UDFArgumentException(udfName + " requires one value argument. Found :"
    + arguments.length);
    }
    PrimitiveObjectInspector argumentOI;
    if(arguments[0] instanceof PrimitiveObjectInspector) {
    argumentOI = (PrimitiveObjectInspector) arguments[0];
    } else {
    throw new UDFArgumentException(udfName + " takes only primitive types. found "
    + arguments[0].getTypeName());
    }
    switch (argumentOI.getPrimitiveCategory()) {
    case STRING:
    case CHAR:
    case VARCHAR:
    break;
    default:
    throw new UDFArgumentException(udfName + " takes only STRING/CHAR/VARCHAR types. Found "
    + argumentOI.getPrimitiveCategory());
    }
    converter = new PrimitiveObjectInspectorConverter.TextConverter(argumentOI);
    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
    Object valObject = arguments[0].get();
    if (valObject == null) {
    return null;
    }
    String val = ((Text) converter.convert(valObject)).toString();
    if (val == null) {
    return null;
    }
    result.set(encryptPhonum(val.toString()));
    return result;
    }

    //1. 能够对输入数据进行非空判断、位数判断处理
    //2. 能够实现校验手机号格式,把满足规则的进行处理
    //3. 对于不符合手机号规则的数据原封不动不处理
    private String encryptPhonum(String phoNum) {
    String encryptPhoNum = null;
    // 手机号不为空 并且为11位
    if (StringUtils.isNotEmpty(phoNum) && phoNum.trim().length() == 11 ) {
    //判断数据是否满足中国大陆手机号码规范
    String regex = "^(1[3-9]\\d{9}$)";
    Pattern p = Pattern.compile(regex);
    Matcher m = p.matcher(phoNum);
    if (m.matches()) {//进入这里都是符合手机号规则的
    //使用正则替换 返回加密后数据
    encryptPhoNum = phoNum.trim().replaceAll("()\\d{4}(\\d{4})","$1****$2");
    }else{
    //不符合手机号规则 数据直接原封不动返回
    encryptPhoNum = phoNum;
    }
    }else{
    //不符合11位 数据直接原封不动返回
    encryptPhoNum = phoNum;
    }
    return encryptPhoNum;
    }

    @Override
    public String getDisplayString(String[] children) {
    return "This is encrypt_phonum";
    }
    }

    参考内置的GenericUDFBaseTrim

  3. 将项目打包上传到hiveserver2服务器的家目录下

    1
    2
    3
    4
    5
    6
    7
    [eitan@hadoop103 ~]$ ll
    总用量 116120
    drwxrwxr-x. 2 eitan eitan 4096 512 16:09 bin
    drwxrwxr-x. 3 eitan eitan 4096 513 10:15 documents
    -rw-rw-r--. 1 eitan eitan 118888832 5月 14 23:54 hive-example-1.0-SNAPSHOT.jar
    drwxr-xr-x. 3 eitan eitan 4096 513 21:55 hive_export
    drwxrwxr-x. 2 eitan eitan 4096 512 16:08 log
  4. 使用命令把jar包添加至classpath

    1
    ADD JAR /home/eitan/hive-example-1.0-SNAPSHOT.jar;
  5. 注册临时函数

    1
    CREATE TEMPORARY FUNCTION encrypt_phonum AS 'cn.itcast.hive.udf.EncryptPhoneNumber';
  6. 使用效果

    1
    2
    3
    4
    -- 10086
    SELECT encrypt_phonum("10086");
    -- ****5338730
    SELECT encrypt_phonum("18905338730");

Hive函数高级

explode函数

功能介绍
  • explode接收map、array类型的数据作为输入,然后把输入数据中的每个元素拆开变成一行数据,一个元素一行;
  • lexplode执行效果正好满足于输入一行输出多行,所有叫做UDTF函数。
简单使用
1
2
3
SELECT explode(`array`(11,22,33)) AS item;

SELECT explode(`map`("id",10086,"name","zhangsan","age",18));
explode使用限制及原因

限制:在select的时候,explode的旁边不支持其他字段的同时出现

原因:

  1. explode函数属于UDTF函数,即表生成函数;
  2. explode函数执行返回的结果可以理解为一张虚拟的表,其数据来源于源表;
  3. 在select中只查询源表数据没有问题,只查询explode生成的虚拟表数据也没问题
  4. 但是不能在只查询源表的时候,既想返回源表字段又想返回explode生成的虚拟表字段
  5. 通俗点讲,有两张表,不能只查询一张表但是返回分别属于两张表的字段;
  6. 从SQL层面上来说应该对两张表进行关联查询
  7. Hive专门提供了语法lateral View侧视图,专门用于搭配explode这样的UDTF函数,以满足上述需要。

Lateral View侧视图

概念及原理
  1. Lateral View是一种特殊的语法,主要用于搭配UDTF类型功能的函数一起使用,用于解决UDTF函数的一些查询限制的问题;
  2. 将UDTF的结果构建成一个类似于视图的表,然后将原表中的每一行和UDTF函数输出的每一行进行连接,生成一张新的虚拟表;
  3. 使用lateral view时也可以对UDTF产生的记录设置字段名称,产生的字段可以用于group by、order by 、limit等语句中,不需要再单独嵌套一层子查询。

image-20220515104535030

语法
1
2
--lateral view侧视图基本语法如下
SELECT …… FROM tabelA LATERAL VIEW UDTF(xxx) 别名 AS col1,col2,col3……;
案例

有一份数据《The_NBA_Championship.txt》,关于部分年份的NBA总冠军球队名单:

1
2
3
4
5
6
7
8
9
10
Chicago Bulls,1991|1992|1993|1996|1997|1998
San Antonio Spurs,1999|2003|2005|2007|2014
Golden State Warriors,1947|1956|1975|2015
Boston Celtics,1957|1959|1960|1961|1962|1963|1964|1965|1966|1968|1969|1974|1976|1981|1984|1986|2008
L.A. Lakers,1949|1950|1952|1953|1954|1972|1980|1982|1985|1987|1988|2000|2001|2002|2009|2010
Miami Heat,2006|2012|2013
Philadelphia 76ers,1955|1967|1983
Detroit Pistons,1989|1990|2004
Houston Rockets,1994|1995
New York Knicks,1970|1973

需求:使用Hive建表映射成功数据,对数据拆分,并且根据年份的倒序进行排序。

实践:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- step1 建表
CREATE TABLE the_nba_championship
(
team_name string COMMENT "队伍名称",
champion_year array<string> COMMENT "夺冠年份"
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY "|";

-- step2 上传数据|加载数据
LOAD DATA LOCAL INPATH '/home/eitan/documents/txt/The_NBA_Championship.txt' INTO TABLE the_nba_championship;

-- step3 数据拆分查询,不排序则不需要MR操作
SELECT a.team_name,b.year FROM the_nba_championship a LATERAL VIEW explode(champion_year) b AS year ORDER BY b.year DESC;

Aggregation 聚合函数

基础聚合函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
SELECT * FROM t_student;
-- 场景1:没有group by子句的聚合操作
SELECT count(*) AS cnt1,count(1) AS cnt2 FROM t_student;
SELECT count(*) AS cnt1,count(1) AS cnt2,count(sex) FROM t_student;

-- 场景2:带有group by子句的聚合操作 注意group by语法限制
SELECT sex, count(*) AS cnt FROM t_student GROUP BY sex;

-- 场景3:select时多个聚合函数一起使用
SELECT count(*) AS cnt1,avg(age) AS cnt2 FROM t_student;

-- 场景4:聚合函数和case when条件转换函数、coalesce函数、if函数使用
SELECT
sum(CASE WHEN sex = '男'THEN 1 ELSE 0 END)
FROM t_student;

SELECT
sum(if(sex = '男',1,0))
FROM t_student;

-- 聚合参数针对null的处理方式
-- null null 0
SELECT max(null), min(null), count(null);
-- 下面这两个不支持null
SELECT sum(null), avg(null);

-- 场景5:聚合操作时针对null的处理
CREATE TABLE tmp_1 (val1 int, val2 int);
INSERT INTO TABLE tmp_1 VALUES (1, 2),(null,2),(2,3);
SELECT * FROM tmp_1;
-- 第二行数据(NULL, 2) 在进行sum(val1 + val2)的时候会被忽略
SELECT sum(val1), sum(val1 + val2) FROM tmp_1;
-- 可以使用coalesce函数解决
SELECT
sum(coalesce(val1,0)),
sum(coalesce(val1,0) + val2)
FROM tmp_1;

-- 场景6:配合distinct关键字去重聚合
-- 此场景下,会编译期间会自动设置只启动一个reduce task处理数据 性能可能会不会 造成数据拥堵
SELECT count(DISTINCT sex) AS cnt1 FROM t_student;
-- 可以先去重 在聚合 通过子查询完成
-- 因为先执行distinct的时候 可以使用多个reducetask来跑数据
SELECT count(*) AS gender_uni_cnt
FROM (SELECT DISTINCT sex FROM t_student) a;

-- 案例需求:找出t_student中男女学生年龄最大的及其名字
-- 这里使用了struct来构造数据 然后针对struct应用max找出最大元素 然后取值
SELECT struct(age, name) FROM t_student;
SELECT struct(age, name).col1 FROM t_student;
SELECT max(struct(age, name)) FROM t_student;

SELECT sex,
max(struct(age, name)).col1 as age,
max(struct(age, name)).col2 as name
FROM t_student
GROUP BY sex;
增强聚合函数

增强聚合的GROUPING SETS、CUBE、ROLLUP这几个函数主要适用于OLAP多维数据分析模式中,多维分析中的指的分析问题时看待问题的维度、角度。

  1. 准备数据 字段:月份、天、用户cookieid

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    2018-03,2018-03-10,cookie1
    2018-03,2018-03-10,cookie5
    2018-03,2018-03-12,cookie7
    2018-04,2018-04-12,cookie3
    2018-04,2018-04-13,cookie2
    2018-04,2018-04-13,cookie4
    2018-04,2018-04-16,cookie4
    2018-03,2018-03-10,cookie2
    2018-03,2018-03-10,cookie3
    2018-04,2018-04-12,cookie5
    2018-04,2018-04-13,cookie6
    2018-04,2018-04-15,cookie3
    2018-04,2018-04-15,cookie2
    2018-04,2018-04-16,cookie1
  2. 表创建并加载数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE cookie_info
    (
    month STRING,
    day STRING,
    cookieid STRING
    ) ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ',';

    LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/cookie_info.txt" INTO TABLE cookie_info;
  3. GROUPING SETS

    GROUPING SETS是一种将多个group by逻辑写在一个sql语句中的便利写法。等价于将不同维度的GROUP BY结果集进行UNION ALL。GROUPING__ID表示结果属于哪一个分组集合。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    SELECT month,
    day,
    COUNT(DISTINCT cookieid) AS nums,
    GROUPING__ID
    FROM cookie_info
    GROUP BY month, day
    GROUPING SETS ( month, day)
    ORDER BY GROUPING__ID;

    -- GROUPING__ID表示这一组结果属于哪个分组集合,
    -- 根据grouping sets中的分组条件month,day,1是代表month,2是代表day

    -- 等价于
    SELECT month, NULL, COUNT(DISTINCT cookieid) AS nums, 1 AS GROUPING__ID FROM cookie_info GROUP BY month
    UNION ALL
    SELECT NULL as month, day, COUNT(DISTINCT cookieid) AS nums, 2 AS GROUPING__ID FROM cookie_info GROUP BY day;

    -- 再比如
    SELECT month,
    day,
    COUNT(DISTINCT cookieid) AS nums,
    GROUPING__ID
    FROM cookie_info
    GROUP BY month, day
    GROUPING SETS ( month, day, ( month, day))
    ORDER BY GROUPING__ID;

    -- 等价于
    SELECT month, NULL, COUNT(DISTINCT cookieid) AS nums, 1 AS GROUPING__ID FROM cookie_info GROUP BY month
    UNION ALL
    SELECT NULL, day, COUNT(DISTINCT cookieid) AS nums, 2 AS GROUPING__ID FROM cookie_info GROUP BY day
    UNION ALL
    SELECT month, day, COUNT(DISTINCT cookieid) AS nums, 3 AS GROUPING__ID FROM cookie_info GROUP BY month, day;
  4. CUBE

    CUBE的语法功能指的是:根据GROUP BY的维度的所有组合进行聚合。对于CUBE,如果有n个维度,则所有组合的总个数是:2^n。比如CUBE有a,b,c3个维度,则所有组合情况是: ((a,b,c),(a,b),(b,c),(a,c),(a),(b),(c),())。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    SELECT
    month,
    day,
    COUNT(DISTINCT cookieid) AS nums,
    GROUPING__ID
    FROM cookie_info
    GROUP BY month,day
    WITH CUBE
    ORDER BY GROUPING__ID;

    -- 等价于
    SELECT NULL,NULL,COUNT(DISTINCT cookieid) AS nums,0 AS GROUPING__ID FROM cookie_info
    UNION ALL
    SELECT month,NULL,COUNT(DISTINCT cookieid) AS nums,1 AS GROUPING__ID FROM cookie_info GROUP BY month
    UNION ALL
    SELECT NULL,day,COUNT(DISTINCT cookieid) AS nums,2 AS GROUPING__ID FROM cookie_info GROUP BY day
    UNION ALL
    SELECT month,day,COUNT(DISTINCT cookieid) AS nums,3 AS GROUPING__ID FROM cookie_info GROUP BY month,day;
  5. ROLLUP

    CUBE的语法功能指的是:根据GROUP BY的维度的所有组合进行聚合。

    ROLLUP是Cube的子集,以最左侧的维度为主,从该维度进行层级聚合。比如ROLLUP有a,b,c3个维度,则所有组合情况是:((a,b,c),(a,b),(a),())。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    --rollup-------------
    --比如,以month维度进行层级聚合:
    SELECT
    month,
    day,
    COUNT(DISTINCT cookieid) AS nums,
    GROUPING__ID
    FROM cookie_info
    GROUP BY month,day
    WITH ROLLUP
    ORDER BY GROUPING__ID;

    --把month和day调换顺序,则以day维度进行层级聚合:
    SELECT
    day,
    month,
    COUNT(DISTINCT cookieid) AS uv,
    GROUPING__ID
    FROM cookie_info
    GROUP BY day,month
    WITH ROLLUP
    ORDER BY GROUPING__ID;

Window functions 窗口函数

概述
  1. 窗口函数(Window functions)也叫做开窗函数、OLAP函数,其最大特点是:输入值是从SELECT语句的结果集中的一行或多行的“窗口”中获取的。
  2. 如果函数具有OVER子句,则它是窗口函数。
  3. 窗口函数可以简单地解释为类似于聚合函数的计算函数,但是通过GROUP BY子句组合的常规聚合会隐藏正在聚合的各个行,最终输出一行,窗口函数聚合后还可以访问当中的各个行,并且可以将这些行中的某些属性添加到结果集中。
语法
1
Function(arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>])
  • 其中Function(arg1,…, argn) 可以是下面分类中的任意一个

    • 聚合函数:比如sum max avg等
    • 排序函数:比如rank row_number等
    • 分析函数:比如lead lag first_value等
  • OVER [PARTITION BY <…>] 类似于group by 用于指定分组 每个分组你可以把它叫做窗口

  • 如果没有PARTITION BY 那么整张表的所有行就是一组

  • [ORDER BY <….>] 用于指定每个分组内的数据排序规则 支持ASC、DESC

  • [] 用于指定每个窗口中 操作的数据范围 默认是窗口中所有行

实践
  1. 建表并导入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    CREATE TABLE website_pv_info
    (
    cookieid string,
    createtime string,
    pv int
    ) ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ',';

    CREATE TABLE website_url_info
    (
    cookieid string,
    createtime string, --访问时间
    url string --访问页面
    ) ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ',';

    LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/website_pv_info.txt" INTO TABLE website_pv_info;
    LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/website_url_info.txt" INTO TABLE website_url_info;
  2. 窗口聚合函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    -- 1、求出每个用户总pv数  sum+group by普通常规聚合操作
    SELECT cookieid,sum(pv) AS total_pv FROM website_pv_info GROUP BY cookieid;

    -- 2、sum+窗口函数 总共有四种用法 注意是整体聚合 还是累积聚合
    -- sum(...) over( )对表所有行求和
    -- sum(...) over( order by ... ) 连续累积求和
    -- sum(...) over( partition by... ) 同组内所有行求和
    -- sum(...) over( partition by... order by ... ) 在每个分组内,连续累积求和

    -- 需求:求出网站总的pv数 所有用户所有访问加起来
    -- sum(...) over( )对表所有行求和
    SELECT cookieid,createtime,pv,
    sum(pv) OVER() AS total_pv FROM website_pv_info;

    -- 需求:求出每个用户总pv数
    -- sum(...) over( partition by... ),同组内所行求和
    SELECT cookieid,createtime,pv,
    sum(pv) OVER (PARTITION BY cookieid) AS total_pv FROM website_pv_info;

    -- 需求:求出每个用户截止到当天,累积的总pv数
    -- sum(...) over( partition by... order by ... ),在每个分组内,连续累积求和
    SELECT cookieid,createtime,pv,
    sum(pv) OVER(PARTITION BY cookieid ORDER BY createtime) AS current_total_pv
    FROM website_pv_info;
  3. 窗口表达式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    关键字是rows between,包括下面这几个选项
    - preceding:往前
    - following:往后
    - current row:当前行
    - unbounded:边界
    - unbounded preceding 表示从前面的起点
    - unbounded following:表示到后面的终点

    -- 第一行到当前行
    SELECT cookieid,createtime,pv,
    sum(pv) OVER (PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS pv2
    FROM website_pv_info;

    -- 向前3行至当前行
    SELECT cookieid,createtime,pv,
    sum(pv) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW ) AS pv4
    from website_pv_info;

    -- 向前3行 向后1行
    SELECT cookieid,createtime,pv,
    sum(pv) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) AS pv5
    FROM website_pv_info;

    --当前行至最后一行
    SELECT cookieid,createtime,pv,
    sum(pv) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS pv6
    FROM website_pv_info;

    --第一行到最后一行 也就是分组内的所有行
    SELECT cookieid,createtime,pv,
    sum(pv) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS pv6
    FROM website_pv_info;
  4. 窗口排序函数

    窗口排序函数用于给每个分组内的数据打上排序的标号。注意窗口排序函数不支持窗口表达式。总共有4个函数需要掌握:

    1. row_number:在每个分组中,为每行分配一个从1开始的唯一序列号,递增,不考虑重复;

    2. rank: 在每个分组中,为每行分配一个从1开始的序列号,考虑重复,挤占后续位置;

    3. dense_rank: 在每个分组中,为每行分配一个从1开始的序列号,考虑重复,不挤占后续位置;

    4. ntile:将每个分组内的数据分为指定的若干个桶里(分为若干个部分),并且为每一个桶分配一个桶编号。如果不能平均分配,则优先分配较小编号的桶,并且各个桶中能放的行数最多相差1。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    SELECT
    cookieid,
    createtime,
    pv,
    RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn1,
    DENSE_RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn2,
    ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn3
    FROM website_pv_info
    WHERE cookieid = 'cookie1';

    -- 需求:找出每个用户访问pv最多的是哪天的Top3 重复并列的不考虑
    SELECT * FROM
    (SELECT cookieid,
    createtime,
    pv,
    ROW_NUMBER() OVER (PARTITION BY cookieid ORDER BY pv DESC) AS seq FROM website_pv_info) tmp
    WHERE tmp.seq < 4;

    -- 把每个分组内的数据分为3桶 NTILE
    SELECT
    cookieid,
    createtime,
    pv,
    NTILE(3) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn2
    FROM website_pv_info
    ORDER BY cookieid,createtime;

    -- 需求:统计每个用户pv数最多的前3分之1天。
    -- 理解:将数据根据cookieid分 根据pv倒序排序 排序之后分为3个部分 取第一部分
    SELECT * FROM
    (SELECT
    cookieid,
    createtime,
    pv,
    NTILE(3) OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn FROM website_pv_info) tmp
    WHERE rn = 1;
  5. 窗口分析函数

    • LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值

      第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL);

    • LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值

      第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL);

    • FIRST_VALUE 取分组内排序后,截止到当前行,第一个值;

    • LAST_VALUE 取分组内排序后,截止到当前行,最后一个值;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    -- LAG
    SELECT cookieid,
    createtime,
    url,
    ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
    LAG(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS last_1_time,
    LAG(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS last_2_time
    FROM website_url_info;


    -- LEAD
    SELECT cookieid,
    createtime,
    url,
    ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
    LEAD(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS next_1_time,
    LEAD(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS next_2_time
    FROM website_url_info;

    -- FIRST_VALUE
    SELECT cookieid,
    createtime,
    url,
    ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
    FIRST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS first1
    FROM website_url_info;

    -- LAST_VALUE
    SELECT cookieid,
    createtime,
    url,
    ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,
    LAST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS last1
    FROM website_url_info;

Sampling 抽样函数

Random随机抽样

随机抽样使用rand()函数和LIMIT关键字来获取数据。 使用了DISTRIBUTE和SORT关键字,可以确保数据也随机分布在mapper和reducer之间,使得底层执行有效率。

ORDER BY 和rand()语句也可以达到相同的目的,但是表现不好。因为ORDER BY是全局排序,只会启动运行一个Reducer。

1
2
3
4
5
6
-- 需求:随机抽取2个学生的情况进行查看
SELECT * FROM t_student
DISTRIBUTE BY rand() SORT BY rand() LIMIT 2;

-- 使用order by+rand也可以实现同样的效果 但是效率不高
SELECT * FROM t_student ORDER BY rand() LIMIT 2;
Block块抽样

Block块采样允许select随机获取n行数据,即数据大小或n个字节的数据。采样粒度是HDFS块大小。

1
2
3
4
5
6
7
8
9
10
---block抽样
--根据行数抽样
SELECT * FROM t_student TABLESAMPLE(1 ROWS);

--根据数据大小百分比抽样
SELECT * FROM t_student TABLESAMPLE(50 PERCENT);

--根据数据大小抽样
--支持数据单位 b/B, k/K, m/M, g/G
SELECT * FROM t_student TABLESAMPLE(1b);
Bucket table分桶表抽样

这是一种特殊的采样方法,针对分桶表进行了优化。优点是既随机速度也很快。

语法如下:

1
TABLESAMPLE (BUCKET x OUT OF y [ON colname])
  1. y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。
    例如,table总共分了4份(4个bucket),当y=2时,抽取(4/2=)2个bucket的数据,当y=8时,抽取(4/8=)1/2个bucket的数据
  2. x表示从哪个bucket开始抽取。
    例如,table总bucket数为4,tablesample(bucket 4 out of 4),表示总共抽取(4/4=)1个bucket的数据,抽取第4个bucket的数据。
    注意:x的值必须小于等于y的值,否则FAILED:Numerator should not be bigger than denominator in sample clause for table stu_buck
  3. ON colname表示基于什么抽
    ON rand()表示随机抽
    ON 分桶字段 表示基于分桶字段抽样 效率更高 推荐
1
2
3
4
5
6
7
---bucket table抽样
--根据整行数据进行抽样
SELECT * FROM t_usa_covid19_bucket TABLESAMPLE(BUCKET 1 OUT OF 2 ON rand());

--根据分桶字段进行抽样 效率更高
describe formatted t_usa_covid19_bucket;
SELECT * FROM t_usa_covid19_bucket TABLESAMPLE(BUCKET 1 OUT OF 2 ON state);

Hive函数应用案例

多字节分隔符

应用场景

Hive中默认使用单字节分隔符来加载文本数据,但在实际工作中,我们遇到的数据往往不是非常规范化的数据,我们会遇到以下的两种情况:

  1. 每一行数据的分隔符是多字节分隔符,例如:”||”、“–”等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    01||周杰伦||中国||台湾||||七里香
    02||刘德华||中国||香港||||笨小孩
    03||汪 峰||中国||北京||||光明
    04||朴 树||中国||北京||||那些花儿
    05||许 巍||中国||陕西||||故乡
    06||张靓颖||中国||四川||||画心
    07||黄家驹||中国||香港||||光辉岁月
    08||周传雄||中国||台湾||||青花
    09||刘若英||中国||台湾||||很爱很爱你
    10||张 杰||中国||四川||||天下
  2. 数据的字段中包含了分隔符

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    192.168.88.134 [08/Nov/2020:10:44:32 +0800] "GET / HTTP/1.1" 404 951
    192.168.88.100 [08/Nov/2020:10:44:33 +0800] "GET /hpsk_sdk/index.html HTTP/1.1" 200 328
    192.168.88.134 [08/Nov/2020:20:19:06 +0800] "GET / HTTP/1.1" 404 951
    192.168.88.100 [08/Nov/2020:20:19:13 +0800] "GET /hpsk_sdk/demo4.html HTTP/1.1" 200 982
    192.168.88.100 [08/Nov/2020:20:19:13 +0800] "GET /hpsk_sdk/js/analytics.js HTTP/1.1" 200 11095
    192.168.88.100 [08/Nov/2020:20:19:23 +0800] "GET /hpsk_sdk/demo3.html HTTP/1.1" 200 1024
    192.168.88.100 [08/Nov/2020:20:19:26 +0800] "GET /hpsk_sdk/demo2.html HTTP/1.1" 200 854
    192.168.88.100 [08/Nov/2020:20:19:27 +0800] "GET /hpsk_sdk/demo.html HTTP/1.1" 200 485
    192.168.88.134 [08/Nov/2020:20:26:51 +0800] "GET / HTTP/1.1" 404 951
    192.168.88.134 [08/Nov/2020:20:29:08 +0800] "GET / HTTP/1.1" 404 951
    192.168.88.100 [08/Nov/2020:20:31:27 +0800] "GET /hpsk_sdk/demo5.html HTTP/1.1" 200 5333
    192.168.88.100 [08/Nov/2020:20:32:59 +0800] "GET /hpsk_sdk/demo5.html HTTP/1.1" 200 5333
    192.168.88.100 [08/Nov/2020:20:32:59 +0800] "GET /hpsk_sdk/js/analytics.js HTTP/1.1" 200 11082
    192.168.88.100 [08/Nov/2020:20:32:59 +0800] "GET /favicon.ico HTTP/1.1" 404 973
    192.168.88.100 [08/Nov/2020:20:33:01 +0800] "GET /hpsk_sdk/demo3.html HTTP/1.1" 200 1024
    192.168.88.100 [08/Nov/2020:20:34:25 +0800] "GET /hpsk_sdk/demo2.html HTTP/1.1" 200 854
    192.168.88.100 [08/Nov/2020:20:34:25 +0800] "GET /hpsk_sdk/js/analytics.js HTTP/1.1" 304 -
    192.168.88.100 [08/Nov/2020:20:34:28 +0800] "GET /hpsk_sdk/demo4.html HTTP/1.1" 200 982
    192.168.88.100 [08/Nov/2020:20:35:05 +0800] "GET /hpsk_sdk/demo.html HTTP/1.1" 200 485
    192.168.88.100 [08/Nov/2020:20:35:05 +0800] "GET /hpsk_sdk/js/analytics.js HTTP/1.1" 304 -
解决方案一:替换分割符

如果数据中的分隔符是多字节分隔符,可以使用程序提前将数据中的多字节分隔符替换为单字节分隔符,然后使用Hive加载,就可以实现正确加载对应的数据。

  1. 创建Maven工程,编写对应MapReduce代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.3</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.2</version>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.2</version>
    <configuration>
    <!--只包含该项目代码中用到的jar,在父项目中引入了,但在当前模块中没有用到就会被删掉-->
    <minimizeJar>true</minimizeJar>
    </configuration>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <filters>
    <filter>
    <artifact>*:*</artifact>
    <excludes>
    <exclude>META-INF/*.SF</exclude>
    <exclude>META-INF/*.DSA</exclude>
    <exclude>META-INF/*.RSA</exclude>
    </excludes>
    </filter>
    </filters>
    </configuration>
    </execution>
    </executions>
    </plugin>
    </plugins>
    </build>

    </project>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package cn.itcast.hadoop.changesplit;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    /**
    * 实现Tool接口,目的是为了 hadoop 命令调用保证可以传入一些与程序入参无关的命令行参数
    * Configured的getConf获取环境变量实例
    */
    public class ChangeSplitCharMR extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
    // 1. 构建Job
    Job job = Job.getInstance(this.getConf(), "changeSplit");
    job.setJarByClass(ChangeSplitCharMR.class);
    // 2. 配置Job
    // input
    job.setInputFormatClass(TextInputFormat.class);
    Path inputPath = new Path(strings[0]);
    FileInputFormat.setInputPaths(job, inputPath);
    // map
    job.setMapperClass(ChangeSplitMapper.class);
    // 设置输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    // reduce:不需要 Reduce 过程
    job.setNumReduceTasks(0);
    // output
    job.setOutputFormatClass(TextOutputFormat.class);
    Path outputPath = new Path(strings[1]);
    FileOutputFormat.setOutputPath(job, outputPath);
    // 提交 job
    return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args);
    System.exit(status);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package cn.itcast.hadoop.changesplit;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class ChangeSplitMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Text outputKey = new Text();
    private NullWritable outputValue = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    outputKey.set(line.replace("||","|"));
    context.write(outputKey,outputValue);
    }
    }
  2. 打包并上传

    修改不带依赖的jar包为change-split.jar,并拷贝到虚拟机中

  3. 运行hadoop程序进行数据清洗

    1
    2
    3
    4
    [eitan@hadoop103 ~]$ hadoop fs -put documents/txt/test01.txt /tmp/hive_export


    [eitan@hadoop103 ~]$ hadoop jar change-split.jar cn.itcast.hadoop.changesplit.ChangeSplitCharMR /tmp/hive_export/test01.txt /tmp/hive_export/test01_wash.txt
  4. 结果

    image-20220516141538027

解决方案二:RegexSerDe正则加载

RegexSerde是Hive中专门为了满足复杂数据场景所提供的正则加载和解析数据的接口,使用RegexSerde可以指定正则表达式加载数据,根据正则表达式匹配每一列数据。

  1. 分析数据

    1
    192.168.88.100 [08/Nov/2020:10:44:33 +0800] "GET /hpsk_sdk/index.html HTTP/1.1" 200 328
  2. 正则表达式定义每一列

    1
    ([^ ]*) ([^}]*) ([^ ]*) ([^ ]*) ([^ ]*) ([0-9]*) ([^ ]*)
  3. 创建表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    --创建表
    CREATE TABLE apachelog
    (
    ip string, --IP地址
    stime string, --时间
    mothed string, --请求方式
    url string, --请求地址
    policy string, --请求协议
    stat string, --请求状态
    body string --字节大小
    )
    --指定使用RegexSerde加载数据
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    --指定正则表达式
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^}]*) ([^ ]*) ([^ ]*) ([^ ]*) ([0-9]*) ([^ ]*)"
    );

    LOAD DATA INPATH "/tmp/hive_export/apache_web_access.log" INTO TABLE apachelog;

    SELECT * FROM apachelog;
解决方案三:自定义InputFormat

暂无手动实现。

URL解析函数及侧视图

数据准备
1
2
3
4
1	http://facebook.com/path/p1.php?query=1
2 http://tongji.baidu.com/news/index.jsp?uuid=allen&age=18
3 http://www.jdwz.com/index?source=baidu
4 http://www.itcast.cn/index?source=alibaba
1
2
3
4
5
6
7
8
CREATE TABLE IF NOT EXISTS t_url
(
id int,
url string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/url.txt" INTO TABLE t_url;
parse_url
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT id,
parse_url(url, "PROTOCOL") AS protocol,
parse_url(url, "HOST") AS host,
parse_url(url, "PATH") AS path,
parse_url(url, "QUERY") AS query
FROM t_url;


+-----+-----------+-------------------+------------------+--------------------+
| id | protocol | host | path | query |
+-----+-----------+-------------------+------------------+--------------------+
| 1 | http | facebook.com | /path/p1.php | query=1 |
| 2 | http | tongji.baidu.com | /news/index.jsp | uuid=allen&age=18 |
| 3 | http | www.jdwz.com | /index | source=baidu |
| 4 | http | www.itcast.cn | /index | source=alibaba |
+-----+-----------+-------------------+------------------+--------------------+

缺点:parse_url函数每次只能解析一个参数,导致需要经过多个函数调用才能构建多列,开发角度较为麻烦,实现过程性能也相对较差,需要对同一列做多次计算处理,我们希望能实现调用一次函数,就可以将多个参数进行解析,得到多列结果

parse_url_tuple
1
2
3
4
5
6
7
SELECT a.id,
b.protocol,
b.host,
b.path,
b.query
FROM t_url a
LATERAL VIEW parse_url_tuple(url, "PROTOCOL", "HOST", "PATH", "QUERY") b AS protocol, host, path, query;

行列转换应用

行转列:多行转多列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- 建表并导入数据
CREATE TABLE row2singlecol
(
col1 string,
col2 string,
col3 int
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/r2c1.txt" INTO TABLE row2singlecol;

-- 数据展示
SELECT * FROM row2singlecol;
+---------------------+---------------------+---------------------+
| row2singlecol.col1 | row2singlecol.col2 | row2singlecol.col3 |
+---------------------+---------------------+---------------------+
| a | c | 1 |
| a | d | 2 |
| a | e | 3 |
| b | c | 4 |
| b | d | 5 |
| b | e | 6 |
+---------------------+---------------------+---------------------+

-- 多行转多列
SELECT col1,
max(CASE WHEN col2 = "c" THEN col3 END) AS c,
max(CASE WHEN col2 = "d" THEN col3 END) AS d,
max(CASE WHEN col2 = "e" THEN col3 END) AS e
FROM row2singlecol GROUP BY col1;
+-------+----+----+----+
| col1 | c | d | e |
+-------+----+----+----+
| a | 1 | 2 | 3 |
| b | 4 | 5 | 6 |
+-------+----+----+----+
行转列:多行转单列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
-- 建表并导入数据
CREATE TABLE row2multicol
(
col1 string,
col2 string,
col3 int
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/r2c2.txt" INTO TABLE row2multicol;

-- 数据展示
SELECT * FROM row2multicol;
+--------------------+--------------------+--------------------+
| row2multicol.col1 | row2multicol.col2 | row2multicol.col3 |
+--------------------+--------------------+--------------------+
| a | b | 1 |
| a | b | 2 |
| a | b | 3 |
| c | d | 4 |
| c | d | 5 |
| c | d | 6 |
+--------------------+--------------------+--------------------+


-- 多行转单列
SELECT
col1,
col2,
concat_ws(',', collect_list(cast(col3 as string))) AS col3
FROM row2multicol
GROUP BY col1, col2;
+-------+-------+--------+
| col1 | col2 | col3 |
+-------+-------+--------+
| a | b | 1,2,3 |
| c | d | 4,5,6 |
+-------+-------+--------+
列转行:多列转多行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
-- 建表并导入数据
CREATE TABLE col2multirow
(
col1 string,
col2 int,
col3 int,
col4 int
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/c2r2.txt" INTO TABLE col2multirow;

-- 多列转单行
SELECT * FROM col2multirow;
+--------------------+--------------------+--------------------+--------------------+
| col2multirow.col1 | col2multirow.col2 | col2multirow.col3 | col2multirow.col4 |
+--------------------+--------------------+--------------------+--------------------+
| a | 1 | 2 | 3 |
| b | 4 | 5 | 6 |
+--------------------+--------------------+--------------------+--------------------+

SELECT col1,"a" AS col2,col2 AS col3 FROM col2multirow
UNION ALL
SELECT col1,"b" AS col2,col3 AS col3 FROM col2multirow
UNION ALL
SELECT col1,"c" AS col2,col4 AS col3 FROM col2multirow;
+-----------+-----------+-----------+
| _u1.col1 | _u1.col2 | _u1.col3 |
+-----------+-----------+-----------+
| a | a | 1 |
| a | b | 2 |
| a | c | 3 |
| b | a | 4 |
| b | b | 5 |
| b | c | 6 |
+-----------+-----------+-----------+
列转行:单列转多行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- 建表并导入数据
CREATE TABLE singlecol2row
(
col1 string,
col2 string,
col3 string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t";

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/c2r2.txt" INTO TABLE singlecol2row;

-- 单列转多行
SELECT * FROM singlecol2row;
+---------------------+---------------------+---------------------+
| singlecol2row.col1 | singlecol2row.col2 | singlecol2row.col3 |
+---------------------+---------------------+---------------------+
| a | b | 1,2,3 |
| c | d | 4,5,6 |
+---------------------+---------------------+---------------------+

SELECT a.col1,
a.col2,
b.col3
FROM singlecol2row a
LATERAL VIEW explode(split(col3, ",")) b AS col3;
+---------+---------+---------+
| a.col1 | a.col2 | b.col3 |
+---------+---------+---------+
| a | b | 1 |
| a | b | 2 |
| a | b | 3 |
| c | d | 4 |
| c | d | 5 |
| c | d | 6 |
+---------+---------+---------+

JSON数据处理

需求

将json文件每一个字段解析成对应的表字段

方法一:使用JSON解析函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- 建表并导入数据
CREATE TABLE t_json_method1
(
json string
);

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/device.json" INTO TABLE t_json_method1;

-- 查询
SELECT json_tuple(json, "device", "deviceType", "signal", "time")
AS (device, deviceType, signal, stime)
FROM t_json_method1;

+------------+-------------+---------+----------------+
| device | devicetype | signal | stime |
+------------+-------------+---------+----------------+
| device_30 | kafka | 98.0 | 1616817201390 |
| device_40 | route | 99.0 | 1616817201887 |
| device_21 | bigdata | 77.0 | 1616817202142 |
| device_31 | kafka | 98.0 | 1616817202405 |
| device_20 | bigdata | 12.0 | 1616817202513 |
| device_54 | bigdata | 14.0 | 1616817202913 |
| device_10 | db | 39.0 | 1616817203356 |
| device_94 | bigdata | 59.0 | 1616817203771 |
| device_32 | kafka | 52.0 | 1616817204010 |
| device_21 | bigdata | 85.0 | 1616817204229 |
| device_74 | bigdata | 27.0 | 1616817204720 |
| device_91 | bigdata | 50.0 | 1616817205164 |
| device_62 | db | 89.0 | 1616817205328 |
| device_21 | bigdata | 25.0 | 1616817205457 |
| device_76 | bigdata | 62.0 | 1616817205984 |
| device_74 | bigdata | 44.0 | 1616817206571 |
| device_42 | route | 43.0 | 1616817206681 |
| device_32 | kafka | 65.0 | 1616817207131 |
| device_32 | kafka | 95.0 | 1616817207714 |
| device_71 | bigdata | 45.0 | 1616817207907 |
| device_32 | kafka | 81.0 | 1616817208320 |
| device_10 | db | 81.0 | 1616817208907 |
| device_20 | bigdata | 69.0 | 1616817209287 |
| device_61 | db | 98.0 | 1616817209785 |
| device_30 | kafka | 95.0 | 1616817210104 |
| device_43 | route | 57.0 | 1616817210540 |
| device_10 | db | 36.0 | 1616817211134 |
| device_20 | bigdata | 75.0 | 1616817211248 |
| device_64 | db | 68.0 | 1616817211812 |
| device_53 | bigdata | 60.0 | 1616817212237 |
| device_52 | bigdata | 57.0 | 1616817212709 |
| device_30 | kafka | 75.0 | 1616817213073 |
| device_31 | kafka | 83.0 | 1616817213614 |
| device_93 | bigdata | 54.0 | 1616817214101 |
| device_20 | bigdata | 84.0 | 1616817214639 |
+------------+-------------+---------+----------------+
方法二:指定解析JSON的SerDe
1
2
3
4
5
6
7
8
9
10
11
12
13
-- 创建表并指定序列化和反序列化的类
CREATE TABLE t_json_method2
(
device string,
deviceType string,
signal double,
`time` string
) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/device.json" INTO TABLE t_json_method2;

SELECT * FROM t_json_method2;

窗口函数应用实例

连续登陆用户
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
-- 建表并加载数据
CREATE TABLE t_login
(
userid string,
logintime string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/login.log" INTO TABLE t_login;

SELECT * FROM t_login;
+-----------------+--------------------+
| t_login.userid | t_login.logintime |
+-----------------+--------------------+
| A | 2021-03-22 |
| B | 2021-03-22 |
| C | 2021-03-22 |
| A | 2021-03-23 |
| C | 2021-03-23 |
| A | 2021-03-24 |
| B | 2021-03-24 |
+-----------------+--------------------+

-- 连续两天登入的用户
WITH t1 AS (
SELECT userid,
logintime,
lead(logintime, 1) OVER (PARTITION BY userid ORDER BY logintime) as nexttime
FROM t_login
)
SELECT DISTINCT userid FROM t1 WHERE date_add(logintime, 1) == nexttime;

-- 连续三天登入的用户
WITH t1 AS (
SELECT userid,
date_add(logintime, 2) AS nextday,
lead(logintime, 2) OVER (PARTITION BY userid ORDER BY logintime) as nextlogin
FROM t_login
)
SELECT DISTINCT userid FROM t1 WHERE nextday = nextlogin;
级联累加求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
-- 建表加载数据
CREATE TABLE t_money
(
userid string,
mth string,
money int
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t";

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/money.tsv" INTO TABLE t_money;

SELECT * FROM t_money;
+-----------------+--------------+----------------+
| t_money.userid | t_money.mth | t_money.money |
+-----------------+--------------+----------------+
| A | 2021-01 | 5 |
| A | 2021-01 | 15 |
| B | 2021-01 | 5 |
| A | 2021-01 | 8 |
| B | 2021-01 | 25 |
| A | 2021-01 | 5 |
| A | 2021-02 | 4 |
| A | 2021-02 | 6 |
| B | 2021-02 | 10 |
| B | 2021-02 | 5 |
| A | 2021-03 | 7 |
| B | 2021-03 | 9 |
| A | 2021-03 | 11 |
| B | 2021-03 | 6 |
+-----------------+--------------+----------------+

-- 查询每月消费金额和累计消费金额
WITH t1 AS (
SELECT userid,
mth,
sum(money) AS m_money
FROM t_money GROUP BY userid, mth
)
SELECT userid, mth, m_money, sum(m_money) OVER (PARTITION BY userid ORDER BY mth) FROM t1;
+---------+----------+----------+---------------+
| userid | mth | m_money | sum_window_0 |
+---------+----------+----------+---------------+
| A | 2021-01 | 33 | 33 |
| A | 2021-02 | 10 | 43 |
| A | 2021-03 | 18 | 61 |
| B | 2021-01 | 30 | 30 |
| B | 2021-02 | 15 | 45 |
| B | 2021-03 | 15 | 60 |
+---------+----------+----------+---------------+
分组TopN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
-- 创建表并载入数据
CREATE TABLE t_emp
(
empno string,
ename string,
job string,
managerid string,
hiredate string,
salary double,
bonus double,
deptno string
) ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/emp.txt" INTO TABLE t_emp;

SELECT empno,ename,salary,deptno FROM t_emp;
+--------+---------+---------+---------+
| empno | ename | salary | deptno |
+--------+---------+---------+---------+
| 7369 | SMITH | 800.0 | 20 |
| 7499 | ALLEN | 1600.0 | 30 |
| 7521 | WARD | 1250.0 | 30 |
| 7566 | JONES | 2975.0 | 20 |
| 7654 | MARTIN | 1250.0 | 30 |
| 7698 | BLAKE | 2850.0 | 30 |
| 7782 | CLARK | 2450.0 | 10 |
| 7788 | SCOTT | 3000.0 | 20 |
| 7839 | KING | 5000.0 | 10 |
| 7844 | TURNER | 1500.0 | 30 |
| 7876 | ADAMS | 1100.0 | 20 |
| 7900 | JAMES | 950.0 | 30 |
| 7902 | FORD | 3000.0 | 20 |
| 7934 | MILLER | 1300.0 | 10 |
+--------+---------+---------+---------+

-- 查询每个部分薪水前2名
WITH t1 AS (
SELECT empno,
ename,
salary,
deptno,
row_number() OVER (PARTITION BY deptno ORDER BY salary DESC) AS rn
FROM t_emp
)
SELECT * FROM t1 WHERE rn < 3;
+-----------+-----------+------------+------------+--------+
| t1.empno | t1.ename | t1.salary | t1.deptno | t1.rn |
+-----------+-----------+------------+------------+--------+
| 7839 | KING | 5000.0 | 10 | 1 |
| 7782 | CLARK | 2450.0 | 10 | 2 |
| 7788 | SCOTT | 3000.0 | 20 | 1 |
| 7902 | FORD | 3000.0 | 20 | 2 |
| 7698 | BLAKE | 2850.0 | 30 | 1 |
| 7499 | ALLEN | 1600.0 | 30 | 2 |
+-----------+-----------+------------+------------+--------+

拉链表的设计与实现

功能与应用场景

拉链表专门用于解决在数据仓库中数据发生变化如何实现数据存储的问题,如果直接覆盖历史状态,会导致无法查询历史状态,如果将所有数据单独切片存储,会导致存储大量非更新数据的问题。

拉链表的设计是将更新的数据进行状态记录,没有发生更新的数据不进行状态存储,用于存储所有数据在不同时间上的所有状态,通过时间进行标记每个状态的生命周期,查询时,根据需求可以获取指定时间范围状态的数据,默认用9999-12-31等最大值来表示最新状态。

拉链表实现
  1. 创建表并插入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    CREATE TABLE dw_zipper
    (
    userid string,
    phone string,
    nick string,
    gender int,
    addr string,
    starttime string,
    endtime string
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

    LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/zipper.txt" INTO TABLE dw_zipper;

    SELECT * FROM dw_zipper;

    image-20220518172133066

  2. 创建ods层增量表,模拟增量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    CREATE TABLE ods_zipper_update
    (
    userid string,
    phone string,
    nick string,
    gender int,
    addr string,
    starttime string,
    endtime string
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

    LOAD DATA LOCAL INPATH "/home/eitan/documents/txt/update.txt" INTO TABLE ods_zipper_update;

    SELECT * FROM ods_zipper_update;

    image-20220518172817709

  3. 合并数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    -- 创建零时表
    CREATE TABLE tmp_zipper
    (
    userid string,
    phone string,
    nick string,
    gender int,
    addr string,
    starttime string,
    endtime string
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

    -- 合并拉链表与增量表
    INSERT OVERWRITE TABLE tmp_zipper
    SELECT userid,
    phone,
    nick,
    gender,
    addr,
    starttime,
    endtime
    FROM ods_zipper_update
    UNION ALL
    SELECT a.userid,
    a.phone,
    a.nick,
    a.gender,
    a.addr,
    a.starttime,
    if(b.userid IS NULL OR a.endtime < "9999-12-31", a.endtime, date_sub(b.starttime, 1)) AS endtime
    FROM dw_zipper a
    LEFT JOIN ods_zipper_update b ON a.userid = b.userid;
  4. 覆盖拉链表

    1
    2
    INSERT OVERWRITE TABLE dw_zipper
    SELECT * FROM tmp_zipper ORDER BY userid;

Hive性能优化

Hive表设计优化

分区表

根据查询的需求,将数据按照查询的条件【一般都以时间】进行划分分区存储,将不同分区的数据单独使用一个HDFS目录来进行存储,当底层实现计算时,根据查询的条件,只读取对应分区的数据作为输入,减少不必要的数据加载,提高程序的性能。

分桶表

分桶表的设计有别于分区表的设计,分区表是将数据划分不同的目录进行存储,而分桶表是将数据划分不同的文件进行存储。分桶表的设计是按照一定的规则【通过MapReduce中的多个Reduce来实现】将数据划分到不同的文件中进行存储,构建分桶表。

如果有两张表按照相同的划分规则【按照Join的关联字段】将各自的数据进行划分,在Join时,就可以实现Bucket与Bucket的Join,避免不必要的比较。

Hive表数据优化

文件格式
文件格式 优点 缺点 应用场景
TextFIle(默认) 1. 最简单的数据格式,不需要经过处理,可以直接查看
2. 可以使用任意的分隔符进行分割
3. 可以搭配Gzip、Bzip2、Snappy等压缩一起使用
1. 耗费存储空间,I/O性能较低
2. 结合压缩时Hive不进行数据切分合并,不能进行并行操作,查询效率低
3. 按行存储,读取列的性能差
1. 适合于小量数据的存储查询
2. 一般用于做第一层数据加载和测试使用
SequenceFile 1. 以二进制的KV形式存储数据,与底层交互更加友好,性能更快
2. 可压缩、可分割,优化磁盘利用率和I/O
3. 可并行操作数据,查询效率高
4. SequenceFile也可以用于存储多个小文件
1. 存储空间消耗最大
2. 与非Hadoop生态系统之外的工具不兼容
3. n 构建SequenceFile需要通过TextFile文件转化加载
1. 适合于小量数据,但是查询列比较多的场景
Parquet 1. 更高效的压缩和编码
2. 可用于多种数据处理框架
1. 不支持update, insert, delete, ACID 1. 适用于字段数非常多,无更新,只取部分列的查询
ORC(OptimizedRC File) 1. 列式存储,存储效率非常高
2. 可压缩,高效的列存取
3. 查询效率较高,支持索引
4. 支持矢量化查询
1. 加载时性能消耗较大
2. 需要通过text文件转化生成
3. 读取全量数据时性能较差
1. 适用于Hive中大型的存储、查询
数据压缩
  1. 压缩的优点

    • 减小文件存储所占空间
    • 加快文件传输效率,从而提高系统的处理速度
    • 降低IO读写的次数
  2. 压缩的缺点

    • 使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长
  3. 压缩配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    --开启输出压缩
    set mapreduce.output.fileoutputformat.compress=true;
    --配置压缩类型为Snappy
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

    -- 中间结果压缩
    set hive.exec.compress.intermediate=true;
    set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
    -- 输出结果压缩
    set hive.exec.compress.output=true;

    永久配置:

    1. 将以上MapReduce的配置写入mapred-site.xml中,重启Hadoop

    2. 将以上Hive的配置写入hive-site.xml中,重启Hive

存储优化
  1. 避免小文件的生成

    1
    2
    3
    4
    5
    6
    7
    8
    -- 如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
    set hive.merge.mapfiles=true;
    -- 如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
    set hive.merge.mapredfiles=true;
    -- 每一个合并的文件的大小
    set hive.merge.size.per.task=256000000;
    -- 平均每个文件的大小,如果小于这个值就会进行合并
    set hive.merge.smallfiles.avgsize=16000000;
  2. 读取小文件

    1
    2
    -- 设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  3. ORC文件索引

    一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个column的min/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes。而其中为每个stripe建立的包含min/max值的索引,就称为Row Group Index行组索引

    为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的范围查询过滤优化上

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    -- 开启索引配置
    set hive.optimize.index.filter=true;

    -- 创建表,并指定构建索引
    create table tb_sogou_orc_index
    stored as orc tblproperties ("orc.create.index"="true")
    as select * from tb_sogou_source
    distribute by stime
    sort by stime;

    -- 当进行范围或者等值查询(<,>,=)时就可以基于构建的索引进行查询
    select count(*) from tb_sogou_orc_index where stime > '12:00:00' and stime < '18:00:00';

    建表时候,通过表参数”orc.bloom.filter.columns”=”columnName……”来指定为哪些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当查询条件中包含对该字段的=号过滤时候,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    -- 创建表指定创建布隆索引
    create table tb_sogou_orc_bloom
    stored as orc tblproperties ("orc.create.index"="true","orc.bloom.filter.columns"="stime,userid")
    as select * from tb_sogou_source
    distribute by stime
    sort by stime;


    -- stime的范围过滤可以走row group index,userid的过滤可以走bloom filter index
    select
    count(*)
    from tb_sogou_orc_index
    where stime > '12:00:00' and stime < '18:00:00'
    and userid = '3933365481995287' ;
  4. ORC矢量化查询

    Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。

    注意:要使用矢量化查询执行,就必须以ORC格式存储数据。

    1
    2
    set hive.vectorized.execution.enabled = true;
    set hive.vectorized.execution.reduce.enabled = true;

计算Job执行优化

Explain

常用语法命令如下:

1
EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|]  query
  • FORMATTED:对执行计划进行格式化,返回JSON格式的执行计划
  • EXTENDED:提供一些额外的信息,比如文件的路径信息
  • DEPENDENCY:以JSON格式返回查询所依赖的表和分区的列表
  • AUTHORIZATION:列出需要被授权的条目,包括输入与输出
MapReduce属性优化
  1. 本地模式

    使用Hive的过程中,有一些数据量不大的表也会转换为MapReduce处理,提交到集群时,需要申请资源,等待资源分配,启动JVM进程,再运行Task,一系列的过程比较繁琐,本身数据量并不大,提交到YARN运行返回会导致性能较差的问题。

    Hive为了解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交给YARN,直接在本地运行,以便于提高小数据量程序的性能。

    1
    2
    -- 开启本地模式
    set hive.exec.mode.local.auto = true;

    如果以下任意一个条件不满足,那么即使开启了本地模式,将依旧会提交给YARN集群运行:

    • 处理的数据量不超过128M
    • MapTask的个数不超过4个
    • ReduceTask的个数不超过1个
  2. JVM重用

    JVM正常指代一个Java进程,Hadoop默认使用派生的JVM来执行map-reducer,如果一个MapReduce程序中有100个Map,10个Reduce,Hadoop默认会为每个Task启动一个JVM来运行,那么就会启动100个JVM来运行MapTask,在JVM启动时内存开销大,尤其是Job大数据量情况,如果单个Task数据量比较小,也会申请JVM资源,这就导致了资源紧张及浪费的情况。

    为了解决上述问题,MapReduce中提供了JVM重用机制来解决,JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放,N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。

    1
    2
    3
    -- Hadoop3之前的配置,在mapred-site.xml中添加以下参数
    -- Hadoop3中已不再支持该选项
    mapreduce.job.jvm.numtasks=10
  3. 并行执行

    Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。

    1
    2
    3
    4
    -- 开启Stage并行化,默认为false
    SET hive.exec.parallel=true;
    -- 指定并行化线程数,默认为8
    SET hive.exec.parallel.thread.number=16;
Join优化
  1. Map Join

    1
    2
    3
    4
    5
    -- Hive中小表的大小限制
    -- 2.0版本之前的控制属性
    hive.mapjoin.smalltable.filesize=25M
    -- 2.0版本开始由以下参数控制
    hive.auto.convert.join.noconditionaltask.size=512000000
  2. Reduce Join

  3. Bucket Join

    1. Bucket Join

      语法:clustered by colName

      参数:set hive.optimize.bucketmapjoin = true;

      要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数

    2. Sort Merge Bucket Join(SMB):基于有序的数据Join

      语法:clustered by colName sorted by (colName)

      参数:

      1
      2
      3
      4
      5
      -- 开启分桶SMB join
      set hive.optimize.bucketmapjoin = true;
      set hive.auto.convert.sortmerge.join=true;
      set hive.optimize.bucketmapjoin.sortedmerge = true;
      set hive.auto.convert.sortmerge.join.noconditionaltask=true;

      要求:分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数

优化器
  1. 关联优化

    1
    2
    -- 对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现
    set hive.optimize.correlation=true;
  2. 优化引擎

    1. RBO

      • rule basic optimise:基于规则的优化器
      • 根据设定好的规则来对程序进行优化
    2. CBO

      • cost basic optimise:基于代价的优化器
      • 根据不同场景所需要付出的代价来合适选择优化的方案
      • 对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案
      1
      2
      3
      4
      set hive.cbo.enable=true;
      set hive.compute.query.using.stats=true;
      set hive.stats.fetch.column.stats=true;
      set hive.stats.fetch.partition.stats=true;
      • 要想使用CBO引擎,必须构建数据的元数据【表行数、列的信息、分区的信息……】

      • 提前获取这些信息,CBO才能基于代价选择合适的处理计划

      • 所以CBO引擎一般搭配analyze分析优化器一起使用

  3. Analyze分析优化器

    用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    -- 构建分区信息元数据
    ANALYZE TABLE tablename
    [PARTITION(partcol1[=val1], partcol2[=val2], ...)]
    COMPUTE STATISTICS [noscan];

    -- 构建列的元数据
    ANALYZE TABLE tablename
    [PARTITION(partcol1[=val1], partcol2[=val2], ...)]
    COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];

    -- 查看元数据
    DESC FORMATTED [tablename] [columnname];
谓词下推(PPD)

谓词下推 Predicate Pushdown(PPD)的思想简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,降低了Reduce端的数据负载,节约了集群的资源,也提升了任务的性能。

1
2
-- 默认自动开启谓词下推
hive.optimize.ppd=true;

image-20220519235156534

  • Inner Join和Full outer Join,条件写在on后面,还是where后面,性能上面没有区别
  • Left outer Join时 ,右侧的表写在on后面,左侧的表写在where后面,性能上有提高
  • Right outer Join时,左侧的表写在on后面、右侧的表写在where后面,性能上有提高
  • 如果SQL语句中出现不确定结果的函数,也无法实现下推
数据倾斜
  1. group By的数据倾斜

    当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。

    方案一:开启Map端聚合

    1
    2
    -- 开启Map端聚合:Combiner
    hive.map.aggr=true;

    通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜

    方案二:实现随机分区

    1
    2
    -- SQL中避免数据倾斜,构建随机分区
    select * from table distribute by rand();
    • distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等

    • 默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实现随机分区,避免数据倾斜

    方案三:自动构建随机分区并自动聚合

    1
    2
    -- 开启随机分区,走两个MapReduce 
    hive.groupby.skewindata=true;
    • 开启该参数以后,当前程序会自动通过两个MapReduce来运行

    • 第一个MapReduce自动进行随机分区,然后实现聚合

    • 第二个MapReduce将聚合的结果再按照业务进行处理,得到结果

  2. Join的数据倾斜

    实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾斜问题:

    方案一:提前过滤,将大数据变成小数据,实现Map Join

    方案二:使用Bucket Join

    方案三:使用Skew Join

    Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    -- 开启运行过程中skewjoin
    set hive.optimize.skewjoin=true;
    -- 如果这个key的出现的次数超过这个范围
    set hive.skewjoin.key=100000;
    -- 在编译时判断是否会产生数据倾斜
    set hive.optimize.skewjoin.compiletime=true;
    -- 不合并,提升性能
    set hive.optimize.union.remove=true;
    -- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
    set mapreduce.input.fileinputformat.input.dir.recursive=true;