电商数仓(二):业务数据采集平台

本文为学习笔记,对应视频教程来自【尚硅谷】电商数仓V5.0

电商业务数据

模拟生成业务数据

MySQL安装

参考Hadoop(三):Hive

业务数据生成
通过脚本建表
生成业务数据
1
2
3
4
5
6
7
8
9
10
# 1.在hadoop102的/opt/module/目录下创建db_log文件夹
[eitan@hadoop102 ~]$ mkdir /opt/module/db_log

# 2.把gmall2020-mock-db-2021-11-14.jar和application.properties上传到hadoop102的/opt/module/db_log路径上

# 3.根据需求修改application.properties相关配置
[eitan@hadoop102 ~]$ vim /opt/module/db_log/application.properties

# 4.在该目录下执行,如下命令,生成2020-06-14日期数据
[eitan@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar

数仓环境准备

Hive安装部署

1
2
3
4
5
6
7
8
9
10
11
# 1.把apache-hive-3.1.2-bin.tar.gz上传到linux的/opt/software目录下并解压
[eitan@hadoop102 ~]$ tar -zxf /opt/software/apache-hive-3.1.2-bin.tar.gz -C /opt/module/

# 2.修改apache-hive-3.1.2-bin.tar.gz的名称为hive
[eitan@hadoop102 ~]$ mv /opt/module/apache-hive-3.1.2-bin/ /opt/module/hive

# 3.修改/etc/profile.d/my_env.sh,添加环境变量
[eitan@hadoop102 ~]$ sudo vim /etc/profile.d/my_env.sh
#HIVE_HOME
export HIVE_HOME=/opt/module/hive
export PATH=$PATH:$HIVE_HOME/bin

Hive元数据配置到MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 1.将MySQL的JDBC驱动拷贝到Hive的lib目录下
[eitan@hadoop102 ~]$ mv /opt/software/mysql-connector-java-8.0.29.tar.gz /opt/module/hive/lib/

# 2.配置MySQL作为元数据存储hive-site.xml
[eitan@hadoop102 conf]$ vim hive-site.xml
<configuration>
<!-- 存储元数据mysql相关配置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
</property>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>

<!-- Bind host on which to run the HiveServer2 Thrift interface -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hadoop102</value>
</property>

<!-- 远程模式部署metastore服务地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop102:9083</value>
</property>

<!-- 关闭元数据存储授权 -->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>

<!-- 关闭元数据存储版本的验证 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>

<!-- 显示当前数据库,以及查询表的行头信息 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>

# 3.修改 hive-env.sh
[eitan@hadoop102 conf]$ cp hive-env.sh.template hive-env.sh
[eitan@hadoop102 conf]$ vim hive-env.sh

启动Hive

初始化元数据库
1
[eitan@hadoop102 ~]$ /opt/module/hive/bin/schematool -initSchema -dbType mysql -verbose
修改元数据库字符集

Hive元数据库的字符集默认为Latin1,由于其不支持中文字符,故若建表语句中包含中文注释,会出现乱码现象。

1
2
3
4
-- 字段注释
ALTER TABLE COLUMNS_V2 MODIFY COLUMN COMMENT VARCHAR ( 256 ) CHARACTER SET utf8;
-- 表注释
ALTER TABLE TABLE_PARAMS MODIFY COLUMN PARAM_VALUE MEDIUMTEXT CHARACTER SET utf8;

编写hive启动停止脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[eitan@hadoop102 ~]$ vim bin/myhive.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit;
fi

case $1 in
"start")
echo "================ 启动 Hive ================"
echo "---------------- 启动 metastore ----------------"
ssh hadoop102 "nohup /opt/module/hive/bin/hive --service metastore > /home/eitan/log/metastore.out 2>&1 &"
echo "---------------- 启动 hiveserver2 ----------------"
ssh hadoop102 "nohup /opt/module/hive/bin/hiveserver2 > /home/eitan/log/hiveserver2.out 2>&1 &"
;;
"stop")
echo "================ 关闭 Hive ================"
echo "---------------- 关闭 metastore ----------------"
ssh hadoop102 "ps -ef | grep metastore | grep -v grep | awk '{print $2}' | xargs kill -9"
echo "---------------- 关闭 hiveserver2 ----------------"
ssh hadoop102 "ps -ef | grep hiveserver2 | grep -v grep | awk '{print $2}' | xargs kill -9"
;;
*)
echo "Input Args Error..."
;;
esac

[eitan@hadoop102 ~]$ chmod u+x bin/myhive.sh

业务数据采集模块

业务数据同步概述

数据同步策略概述

数据的同步策略有全量同步增量同步

全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。

增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。

数据同步策略选择
同步策略 优点 缺点
全量同步 逻辑简单 在某些情况下效率较低。例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。
增量同步 效率高,无需同步和存储重复数据 逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能使用

image-20220603122312206

数据同步工具概述
增量同步方案 DataX/Sqoop Maxwell/Canal
对数据库的要求 原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据。 要求数据库记录变更操作,例如MySQL需开启binlog。
数据的中间状态 由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取。 由于是实时获取所有的数据变更操作,所以可以获取变更数据的所有中间状态。
数据同步工具部署
DataX
1
2
3
4
5
# 1.上传文件并解压
[eitan@hadoop102 ~]$ tar -zxf /opt/software/datax.tar.gz -C /opt/module/

# 2.测试运行
[eitan@hadoop102 ~]$ python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
Maxwell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1.上传文件并解压
[eitan@hadoop102 ~]$ tar -zxf /opt/software/maxwell-1.29.2.tar.gz -C /opt/module/
[eitan@hadoop102 ~]$ mv /opt/module/maxwell-1.29.2/ /opt/module/maxwell

# 2.启用MySQL Binlog
[eitan@hadoop102 ~]$ sudo vim /etc/my.cnf
[mysqld]
# 数据库id
server-id = 1
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
1
2
3
4
5
6
7
8
9
10
11
12
-- 3.创建Maxwell所需数据库和用户
-- 创建数据库
CREATE DATABASE maxwell;

-- 调整MySQL数据库密码级别
set global validate_password_policy=0;
set global validate_password_length=4;

-- 创建Maxwell用户并赋予其必要权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 4.配置Maxwell
[eitan@hadoop102 ~]$ cp /opt/module/maxwell/config.properties.example /opt/module/maxwell/config.properties

[eitan@hadoop102 ~]$ vim /opt/module/maxwell/config.properties
log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=maxwell

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

全量表数据同步

数据通道

全量表数据由DataX从MySQL业务数据库直接同步到HDFS,具体数据流向如下图所示。

image-20220604151658669

DataX配置文件生成脚本
1
[eitan@hadoop102 ~]$ vim ./bin/gen_import_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "root"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"

#获取mysql连接
def get_connection():
return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

#获取表格的元数据 包含列名和数据类型
def get_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql, [database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()
return fetchall

#获取mysql表的列名
def get_mysql_columns(database, table):
return map(lambda x: x[0], get_mysql_meta(database, table))

#将获取的元数据中mysql的数据类型转换为hive的数据类型 写入到hdfswriter中
def get_hive_columns(database, table):
def type_mapping(mysql_type):
mappings = {
"bigint": "bigint",
"int": "bigint",
"smallint": "bigint",
"tinyint": "bigint",
"decimal": "string",
"double": "double",
"float": "float",
"binary": "string",
"char": "string",
"varchar": "string",
"datetime": "string",
"time": "string",
"timestamp": "string",
"date": "string",
"text": "string"
}
return mappings[mysql_type]

meta = get_mysql_meta(database, table)
return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

#生成json文件
def generate_json(source_database, source_table):
job = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": mysql_user,
"password": mysql_passwd,
"column": get_mysql_columns(source_database, source_table),
"splitPk": "",
"connection": [{
"table": [source_table],
"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
}]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
"fileType": "text",
"path": "${targetdir}",
"fileName": source_table,
"column": get_hive_columns(source_database, source_table),
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip"
}
}
}]
}
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
json.dump(job, f)


def main(args):
source_database = ""
source_table = ""

options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
for opt_name, opt_value in options:
if opt_name in ('-d', '--sourcedb'):
source_database = opt_value
if opt_name in ('-t', '--sourcetbl'):
source_table = opt_value

generate_json(source_database, source_table)


if __name__ == '__main__':
main(sys.argv[1:])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 安装Python Mysql驱动
[eitan@hadoop102 ~]$ sudo yum install -y MySQL-python

# 脚本使用说明
[eitan@hadoop102 ~]$ python ~/bin/gen_import_config.py -d gmall -t activity_info

# 在~/bin目录下创建gen_import_config.sh脚本
[eitan@hadoop102 ~]$ vim ~/bin/gen_import_config.sh
#!/bin/bash
python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info

[eitan@hadoop102 ~]$ chmod u+x ~/bin/gen_import_config.sh

# 执行gen_import_config.sh脚本
[eitan@hadoop102 ~]$ ./bin/gen_import_config.sh
测试生成的DataX配置文件
1
2
3
4
5
6
7
8
9
10
11
12
# 1.创建目标路径
[eitan@hadoop102 ~]$ hadoop fs -mkdir -p /origin_data/gmall/db/activity_info_full/2020-06-14

# 2.执行DataX同步命令
[eitan@hadoop102 ~]$ python /opt/module/datax/bin/datax.py -p "-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json

# 3.查看hdfs数据
[eitan@hadoop102 ~]$ hadoop fs -cat /origin_data/gmall/db/activity_info_full/2020-06-14/* | zcat
2022-06-04 20:25:41,798 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
1 联想专场 3101 联想满减 2020-10-21 18:49:12 2020-10-31 18:49:15
2 Apple品牌日 3101 Apple品牌日 2020-06-10 00:00:00 2020-06-12 00:00:00
3 女神节 3102 满件打折 2020-03-08 00:00:00 2020-03-09 00:00:00
全量表数据同步脚本
1
2
3
4
[eitan@hadoop102 ~]$ vim ~/bin/mysql_to_hdfs_full.sh


[eitan@hadoop102 ~]$ chmod u+x ~/bin/mysql_to_hdfs_full.sh

增量表数据同步

数据通道

image-20220605145301020

Maxwell配置

按照规划,有cart_info、comment_info等共计13张表需进行增量同步,默认情况下,Maxwell会同步binlog中的所有表的数据变更记录,因此我们需要对Maxwell进行配置,另其只同步这特定的13张表。

另外,为方便下游使用数据,还需对Maxwell进行配置,另其将不同表的数据发往不同的Kafka Topic。Maxwell最终配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[eitan@hadoop102 ~]$ vim /opt/module/maxwell/config.properties
log_level=info

producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka_topic=%{table}

# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

#表过滤,只同步特定的13张表
filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use,include:gmall.favor_info,include:gmall.order_detail,include:gmall.order_detail_activity,include:gmall.order_detail_coupon,include:gmall.order_info,include:gmall.order_refund_info,include:gmall.order_status_log,include:gmall.payment_info,include:gmall.refund_payment,include:gmall.user_info

# 启动kafka查看是否可用接收消息
[eitan@hadoop102 ~]$ zk.sh start
[eitan@hadoop102 ~]$ kf.sh start
[eitan@hadoop102 ~]$ mxw.sh restart

# 启动一个Kafka Console Consumer,消费任一topic数据
[eitan@hadoop102 ~]$ /opt/module/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic cart_info
Flume配置
Flume配置概述

Flume需要将Kafka中各topic的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channe选用FileChanne。

需要注意的是,KafkaSource需订阅Kafka中的13个topic,HDFSSink需要将不同topic的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

image-20220605153027849

Flume配置实操
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
[eitan@hadoop102 ~]$ cp /opt/module/flume/job/kafka_to_hdfs.conf /opt/module/flume/job/kafka_to_hdfs_db.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

# source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_info

a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic

a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000

# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.db.TimestampInterceptor$Builder

# channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/

a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6


# sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db-
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

# 控制输出文件是原生文件
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

# 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
编写Flume拦截
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.atguigu.flume.interceptor.db;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
JSONObject jsonObject = JSONObject.parseObject(new String(event.getBody()));

Long ts = jsonObject.getLong("ts");
headers.put("timestamp", String.valueOf(ts * 1000));
return event;
}

@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new TimestampInterceptor();
}

@Override
public void configure(Context context) {

}
}
}

上传jar包

image-20220605180309350

编写Flume启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
[eitan@hadoop102 ~]$ vim ./bin/f3.sh
#!/bin/bash
case $1 in
"start")
echo "-------- 启动 hadoop104 业务数据服务 flume --------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
echo "-------- 停止 hadoop104 业务数据服务 flume --------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill"
;;
esac
增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
[eitan@hadoop102 ~]$ vim ~/bin/mysql_to_kafka_inc_init.sh
#!/bin/bash

# 该脚本作用是初始化所有增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data(){
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac

总结

用户行为日志采集

flowchart1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 1.基础环境 hadoop集群、zookeeper集群
[eitan@hadoop102 ~]$ jpsall
--------- hadoop102 ----------
11056 JobHistoryServer
10886 NodeManager
11146 QuorumPeerMain
10602 DataNode
10459 NameNode
--------- hadoop103 ----------
10752 QuorumPeerMain
10256 ResourceManager
10391 NodeManager
10079 DataNode
--------- hadoop104 ----------
7971 QuorumPeerMain
7827 NodeManager
7741 SecondaryNameNode
7630 DataNode

# 2.启动服务
[eitan@hadoop102 ~]$ kf.sh start
[eitan@hadoop102 ~]$ f1.sh start
[eitan@hadoop102 ~]$ f2.sh start

# 3.模拟生成日志数据
[eitan@hadoop102 ~]$ lg.sh

# 4.生成 2020-06-15 的日志数据
[eitan@hadoop102 ~]$ vim /opt/module/applog/application.yml
#业务日期
mock.date: "2020-06-15"

[eitan@hadoop102 ~]$ xsync /opt/module/applog/application.yml
[eitan@hadoop102 ~]$ lg.sh

业务数据采集

flowchart2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 1.生成DATAX对应脚本
[eitan@hadoop102 ~]$ gen_import_config.sh

# 2.生成初始数据
[eitan@hadoop102 ~]$ vim /opt/module/db_log/application.properties
#业务日期
mock.date=2020-06-14
#是否重置,首日须置为1,之后置为0
mock.clear=1
#是否重置用户,首日须置为1,之后置为0
mock.clear.user=1

[eitan@hadoop102 ~]$ cd /opt/module/db_log/
[eitan@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar

# 3.使用DATAX同步数据
[eitan@hadoop102 ~]$ mysql_to_hdfs_full.sh all 2020-06-14
1
2
3
4
5
6
# 1.启动maxwell和flume
[eitan@hadoop102 ~]$ mxw.sh start
[eitan@hadoop102 ~]$ f3.sh start

# 2.使用maxwell-bootstrap功能进行首日全量同步
[eitan@hadoop102 ~]$ mysql_to_kafka_inc_init.sh all
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 1.修改maxwell的配置文件
[eitan@hadoop102 ~]$ vim /opt/module/maxwell/config.properties
# 教学版专用配置,和/opt/module/db_log/application.properties中的mock.date参数保持一致
mock_date=2020-06-15

# 2.重启maxwell
[eitan@hadoop102 ~]$ mxw.sh restart

# 3.修改application.properties文件,修改为相同日期
[eitan@hadoop102 ~]$ vim /opt/module/db_log/application.properties
#业务日期
mock.date=2020-06-15
#是否重置,首日须置为1,之后置为0
mock.clear=0
#是否重置用户,首日须置为1,之后置为0
mock.clear.user=0

# 4.启动程序生成数据
[eitan@hadoop102 ~]$ cd /opt/module/db_log/
[eitan@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar