数仓开发之ODS层
1. 概述
采集到Kafka的topic_log和topic_db主题的数据即为实时数仓的ODS层,这一层的作用是对数据做原样展示和备份。 需要注意的是,数据生成工具模拟数据的业务时间是打散在一天不同时刻的,而实时要求数据的业务时间为系统时间,修改application.yml,将mock.if-realtime
置为1,生成数据最新时间截至为一天的当前时刻,配置如下:
yml
mock.if-realtime: 1
2. Kafka数据有序
本项目要求Flink单个并行度的数据严格有序(主要用于数据去重),为了保证这一点,做了如下配置。
- 修改Kafka配置文件server.properties,将topic分区数设置为4
ini
# 在配置文件中修改Kafka主题的默认分区数
num.partitions=4
三台服务器的配置文件都要修改,修改完毕重启Kafka,不可直接分发配置文件,因为每个节点的broker.id必须唯一。分发后必须修改broker.id才能重启Kafka,否则重启失败。
2. 在Flink程序中设置并行度为4
3. 同步维度历史数据
实时计算不考虑历史的事实数据,但要考虑历史维度数据。因此要对维度相关的业务表做一次全量同步。
- 与维度相关的业务表如下:
activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info - 创建mysql_to_kafka_dim_init.sh文件
sh
[jack@hadoop102 bin]$ vim mysql_to_kafka_dim_init.sh
#!/bin/bash
# 该脚本的作用是初始化所有的业务数据,只需执行一次
MAXWELL_HOME=/opt/module/maxwell-1.29.2
import_data() {
for tab in $@
do
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall_v4 --table ${tab} --config $MAXWELL_HOME/config.properties
done
}
case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)
import_data $1
;;
"all")
import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info
;;
esac
- 增加执行权限
sh
chmod +x mysql_to_kafka_dim_init.sh
- 执行脚本(注意:需要启动Maxwell进程)
sh
[jack@hadoop102 bin]$ mysql_to_kafka_dim_init.sh all
- 启动Kafka消费者,观察数据是否写入Kafka
sh
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db