Skip to content

数仓开发之ODS层

1. 概述

采集到Kafka的topic_log和topic_db主题的数据即为实时数仓的ODS层,这一层的作用是对数据做原样展示和备份。 需要注意的是,数据生成工具模拟数据的业务时间是打散在一天不同时刻的,而实时要求数据的业务时间为系统时间,修改application.yml,将mock.if-realtime置为1,生成数据最新时间截至为一天的当前时刻,配置如下:

yml
mock.if-realtime: 1

2. Kafka数据有序

本项目要求Flink单个并行度的数据严格有序(主要用于数据去重),为了保证这一点,做了如下配置。

  1. 修改Kafka配置文件server.properties,将topic分区数设置为4
ini
# 在配置文件中修改Kafka主题的默认分区数
num.partitions=4

三台服务器的配置文件都要修改,修改完毕重启Kafka,不可直接分发配置文件,因为每个节点的broker.id必须唯一。分发后必须修改broker.id才能重启Kafka,否则重启失败。
2. 在Flink程序中设置并行度为4

3. 同步维度历史数据

实时计算不考虑历史的事实数据,但要考虑历史维度数据。因此要对维度相关的业务表做一次全量同步。

  1. 与维度相关的业务表如下:
    activity_info
    activity_rule
    activity_sku
    base_category1
    base_category2
    base_category3
    base_province
    base_region
    base_trademark
    coupon_info
    coupon_range
    financial_sku_cost
    sku_info
    spu_info
    user_info
  2. 创建mysql_to_kafka_dim_init.sh文件
sh
[jack@hadoop102 bin]$ vim mysql_to_kafka_dim_init.sh
#!/bin/bash

# 该脚本的作用是初始化所有的业务数据,只需执行一次
MAXWELL_HOME=/opt/module/maxwell-1.29.2

import_data() {
    for tab in $@
    do
      $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall_v4 --table ${tab} --config $MAXWELL_HOME/config.properties
    done
}

case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)
  import_data $1 
  ;;
"all")
  import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info
  ;;
esac
  1. 增加执行权限
sh
chmod +x mysql_to_kafka_dim_init.sh
  1. 执行脚本(注意:需要启动Maxwell进程)
sh
[jack@hadoop102 bin]$ mysql_to_kafka_dim_init.sh all
  1. 启动Kafka消费者,观察数据是否写入Kafka
sh
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db