Skip to content

日志数据采集平台搭建

1. 日志生成

  1. 通过java编写的日志生成工具:https://gitee.com/javaee_home/log-collector ,下载后编译打包即可。
  2. 编写日志生成脚本log.sh
sh
#!/bin/bash
echo "========== hadoop105 =========="
ssh hadoop105 "cd /opt/module/app/; nohup java -jar -DHappenDate=$1 log-collector-1.0-jar-with-dependencies.jar 1>/dev/null 2>&1 &"

使用的话, 传入指定日期即可在/tmp/logs/下面生成对应的日志:

sh
[jack@hadoop105 logs]$ log.sh 2020-03-10

2. 日志采集Flume安装

集群规划:

服务器hadoop110服务器hadoop111服务器hadoop112
Flume(采集日志)FlumeFlume

2.1 安装部署Flume

  1. 访问官网https://flume.apache.org/download.html ,下载flume并上传服务器/opt/software目录:
sh
[jack@hadoop102 software]$ ll
总用量 718600
-rw-rw-r--. 1 jack jack  87380462 10月 28 00:00 apache-flume-1.11.0-bin.tar.gz
-rw-rw-r--. 1 jack jack 141887242 10月 31 2024 jdk-8u391-linux-x64.tar.gz
  1. 解压到/opt/module, 并且重名文件夹
sh
[jack@hadoop102 software]$ tar -xvf apache-flume-1.11.0-bin.tar.gz -C ../module/
[jack@hadoop102 software]$ cd ../module/
[jack@hadoop102 module]$ mv apache-flume-1.11.0-bin/ flume-1.11.0
  1. conf目录下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
sh
[jack@hadoop102 module]$ cd flume-1.11.0/conf
[jack@hadoop102 module]$ cp flume-env.sh.template flume-env.sh
[jack@hadoop102 conf]$ vi flume-env.sh
## 修改文件中JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_144
  1. 修改日志配置
    flume1.11之后使用的是log4j2作为日志框架, 相比之前的flume修改log4j.properties不同:
sh
[jack@hadoop110 conf]$ ll
总用量 28
-rw-r--r--. 1 jack jack 1661 11月  5 14:16 flume-conf.properties.template
-rw-r--r--. 1 jack jack 1455 11月  5 14:16 flume-env.ps1.template
-rw-r--r--. 1 jack jack 1632 11月  5 20:52 flume-env.sh
-rw-r--r--. 1 jack jack 1568 11月  5 14:16 flume-env.sh.template
-rw-r--r--. 1 jack jack 2427 11月  5 23:13 log4j2.xml
[jack@hadoop110 conf]$ vim log4j2.xml
## 修改LOG_DIR路径改为绝对路径
 <Property name="LOG_DIR">/opt/module/flume-1.11.0/logs</Property>
  1. 创建logs文件夹
sh
mkdir /opt/module/flume-1.11.0/logs
  1. 分发hadoop111服务器
sh
[jack@hadoop110 module]$ scp flume-1.11.0 jack@hadoop111:/opt/module/

2.2 Flume扩展组件

  1. Source
    Taildir Source相比Exec SourceSpooling Directory Source的优势:
    • TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
    • Exec Source: 可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
    • Spooling Directory Source: 监控目录,不支持断点续传。

推荐使用TailDir Source
2. batchSize大小如何设置?
答:Event大小若为1K左右时,500-1000合适(默认为100)
3. Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。 注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

2.3 日志采集Flume配置

  1. 总体思路:
    Alt text Flume直接读log日志的数据, log日志的格式是app-yyyy-mm-dd.log。图中ETL拦截器主要用于简单过滤非法数据(json数据需要大括号开头和结尾), 类型区分拦截器主要用于将启动日志和事件日志区分开发给不同的Topic。
  2. 在conf目录下创建file-flume-kafka.conf文件
    flume采集配置一共分为几大部分:
  • 定义组件
  • 配置source
  • 配置channel
  • 配置sink
  • 拼接组件
sh
[jack@hadoop102 conf]$ vim file-flume-kafka.conf
# 在文件配置代理a1如下内容
# 定义组件
a1.sources=r1
a1.channels=c1 c2

# 配置source 
a1.sources.r1.type = TAILDIR
## 表示读取f1目录下面app开头的文件
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
# 记录日志读取位置
a1.sources.r1.positionFile = /opt/module/flume-1.11.0/position/log_position.json
a1.sources.r1.filegroups = f1

a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
# ETL拦截器
a1.sources.r1.interceptors.i1.type = com.rocket.interceptor.LogETLInterceptor$Builder
# 日志类型拦截器 
a1.sources.r1.interceptors.i2.type = com.rocket.interceptor.LogTypeInterceptor$Builder
# 区分日志类型,replicating是发送所有通道,multiplexing自定义发送通道
a1.sources.r1.selector.type = multiplexing
# multiplexing底层通过flume数据中的header判断区分
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# 配置channel  kafka相关配置
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop110:9092,hadoop111:9092,hadoop112:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop110:9092,hadoop111:9092,hadoop112:9092
a1.channels.c2.kafka.topic = topic_event
# 取消
a1.channels.c2.parseAsFlumeEvent = false
## 配置c1和c2是同一组的
a1.channels.c2.kafka.consumer.group.id = flume-consumer
  1. 创建相关文件夹
sh
mkdir  /opt/module/flume-1.11.0/position/

2.4 Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志。
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

  1. 创建Maven工程flume-interceptor
  2. 创建包名:com.jack.flume.interceptor
  3. 在pom.xml文件中添加如下配置
xml
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.11.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  1. 创建LogETLInterceptor类, LogUtils类
java
public class LogETLInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

   // 单个Event事件处理
    @Override
    public Event intercept(Event event) {
        // 1. 获取数据
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        // 2 判断数据类型并向Header中赋值
        if (log.contains("start")) {
            if (LogUtils.validateStart(log)){
                return event;
            }
        }else {
            if (LogUtils.validateEvent(log)){
                return event;
            }
        }
        // 3 返回校验结果
        return null;
    }
   // 多个Event事件处理
    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();
        for (Event event : events) {
            Event intercept1 = intercept(event);
            // 校验合格就放入集合中
            if (intercept1 != null){
                interceptors.add(intercept1);
            }
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
java
public class LogUtils {
    public static boolean validateStart(String log) {
        if (log == null){
            return false;
        }
        // 校验json
        if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){
            return false;
        }
        return true;
    }

    public static boolean validateEvent(String log) {
        // 1 切割
        String[] logContents = log.split("\\|");
        // 2 校验
        if(logContents.length != 2){
            return false;
        }
        //3 校验服务器时间
        if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){
            return false;
        }
        // 4 校验json
        if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){
            return false;
        }
        return true;
    }
}
  1. 编写Flume日志类型区分拦截器LogTypeInterceptor
java
public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 区分日志类型:   body  header
        // 1 获取body数据
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        // 2 获取header
        Map<String, String> headers = event.getHeaders();
        // 3 判断数据类型并向Header中赋值
        if (log.contains("start")) {
            headers.put("topic","topic_start");
        }else {
            headers.put("topic","topic_event");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();
        for (Event event : events) {
            Event intercept1 = intercept(event);
            interceptors.add(intercept1);
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements  Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
  1. 打包
    拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入Flume的lib文件夹下面。
    Alt text

为什么不需要依赖包?

因为依赖包在flume的lib目录下面已经存在了。

  1. 先将打好的包放入到hadoop110的/opt/module/flume-1.11.0/lib文件夹下面
sh
[jack@hadoop110 lib]$ ll | grep flume-interceptor
-rw-rw-r--. 1 jack jack    6423 11月  4 14:12 flume-interceptor-1.0-SNAPSHOT.jar
  1. 分发配置Flume到hadoop111
sh
[jack@hadoop110 conf]$ scp file-flume-kafka.conf jack@hadoop111:/opt/module/flume-1.11.0/conf/
file-flume-kafka.conf                                                                                              100% 1630     1.8MB/s   00:00
[jack@hadoop110 lib]$ scp flume-interceptor-1.0-SNAPSHOT.jar jack@hadoop111:/opt/module/flume-1.11.0/lib/

2.5 日志采集Flume启停脚本

  1. 在/home/jack/bin目录下创建脚本flume_helper
sh
mkdir /opt/module/flume-1.11.0/logs
vim flume_helper
# 在脚本中填写如下内容
#! /bin/bash

case $1 in
"start"){
        for i in hadoop110 hadoop111
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume-1.11.0/bin/flume-ng agent --conf /opt/module/flume-1.11.0/conf/ --conf-file /opt/module/flume-1.11.0/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.11.0/logs/flume.log 2>&1 &"
        done
};;	
"stop"){
        for i in hadoop110 hadoop111
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs kill"
        done

};;
esac

脚本说明:

  • nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
  • awk默认分隔符为空格。
  • xargs表示取出前面命令运行的结果,作为后面命令的输入参数。
    在脚本stop内容中,$2前面加转义是为了区分脚本输入参数的$2。

3. Zookeeper安装

kafka依赖zookeeper,搭建zookeeper集群。

3.1 集群规划

在hadoop110、hadoop111和hadoop112三个节点上部署Zookeeper。具体安装细节可以参考Zookeeper 集群安装

3.2 ZK集群启停脚本

  1. 在hadoop102的/home/jack/bin目录下创建脚本zk_helper:
sh
[jack@hadoop102 bin]$ vim zk_helper
# 在脚本中编写如下内容
#! /bin/bash

case $1 in
"start"){
	for i in hadoop110 hadoop111 hadoop112
	do
        echo "==================== $i ===================="
		ssh $i "/opt/module/zookeeper-3.7.2/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop110 hadoop111 hadoop112
	do
        echo "==================== $i ===================="
		ssh $i "/opt/module/zookeeper-3.7.2/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop110 hadoop111 hadoop112
	do
        echo "==================== $i ===================="
		ssh $i "/opt/module/zookeeper-3.7.2/bin/zkServer.sh status"
	done
};;
esac
  1. 增加脚本执行权限
sh
chmod 777 zk_helper

4. Kafka安装

集群规划:

服务器hadoop110服务器hadoop111服务器hadoop112
KafkaKafkaKafkaKafka

4.1 Kafka集群安装

具体安装细节可以参考Kafka集群版本搭建

4.2 Kafka集群启停脚本

  1. 在/home/jack/bin目录下创建脚本kafka_helper
sh
#! /bin/bash

case $1 in
"start"){
        for i in hadoop110 hadoop111 hadoop112
        do
                echo " --------启动 $i Kafka-------"
                ssh $i "/opt/module/kafka_3.6.2/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop110 hadoop111 hadoop112
        do
                echo " --------停止 $i Kafka-------"
                ssh $i "/opt/module/kafka_3.6.2/bin/kafka-server-stop.sh stop"
        done
};;
esac

4.3 Kafka命令行操作

  1. 查看当前服务器中的所有topic
sh
[jack@hadoop111 kafka_3.6.2]$ bin/kafka-topics.sh --bootstrap-server hadoop110:9092,hadoop111:9092,hadoop112:9092 --list
__consumer_offsets
test
toolbox-kafka-test-topic
topic_event
topic_start

可以发现主题topic_start、topic_event已经自动创建好了,因为Flume会通过组件里面的kafka channel写到kafka集群,如果kafka的topic没有创建,就自动创建。

5. Kafka压力测试

通过压力测试可以知道当前Kafka处理数据能力,更好的设计系统。

5.1 Kafka压测概述

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

  • kafka-consumer-perf-test.sh
  • kafka-producer-perf-test.sh

5.2 Kafka Producer压力测试

sh
[jack@hadoop110 kafka_3.6.2]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop110:9092,hadoop111:9092,hadoop112:9092
  • record-size是一条信息有多大,单位是字节。
  • num-records是总共发送多少条信息。
  • throughput是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
    Kafka会打印下面的信息:
sh
100000 records sent, 23849.272597 records/sec (2.27 MB/sec), 1380.89 ms avg latency, 1764.00 ms max latency, 1396 ms 50th, 1725 ms 95th, 1745 ms 99th, 1764 ms 99.9th.

参数解析:本例中一共写入10w条消息,吞吐量为2.27 MB/sec,每次写入的平均延迟为1380.89毫秒,最大的延迟为 1764.00毫秒。

5.3 Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

sh
[jack@hadoop110 kafka_3.6.2]$ 
./bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop110:9092,hadoop111:9092,hadoop112:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1

测试结果说明:
2024-11-04 21:03:37:844, 2024-11-04 21:03:49:715, 9.5367, 0.8034, 100000, 8423.8902, 455, 11416, 0.8354, 8759.6356
开始测试时间,测试结束数据,共消费数据9.536MB,吞吐量0.8034MB/s,共消费100010条,平均每秒消费8423.8902条。

6. Kafka机器数量计算

Kafka机器数量(经验公式)= 2 *(峰值生产速度 * 副本数 / 100)+ 1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量 = 2 *(50 * 2 / 100)+ 1 = 3台

7. 消费Kafka数据Flume

7.1 Flume配置分析

Alt text

7.2 集群规划

如果需要向HDFS写入数据,Flume需要安装在Hadoop集群上,否则会找不到HDFS文件系统。

服务器hadoop102服务器hadoop103服务器hadoop104
Flume(消费Kafka)Flume

7.3 项目经验之Flume组件选型

  1. FileChannel和MemoryChannel区别
  • MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
  • FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

选型
金融类公司、对钱要求非常准确的公司通常会选择FileChannel。
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
2. FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据。
3. Sink:HDFS Sink
我们选择HDFS Sink,因为我们消费的数据采集最终到HDFS上。
a. HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。。
b. HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount。
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件。
(2)文件创建超3600秒时会滚动生成新文件。

7.4 Flume时间拦截器

由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部分数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。
解决的思路: 拦截json日志, 通过fastison框架解析json, 获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp, 因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

  1. 在com.rocket.interceptor包下面创建EventDateInterceptor类\StartDateInterceptor类分别处理event和start类型日志。
java
package com.rocket.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class EventDateInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String str = new String(event.getBody(), StandardCharsets.UTF_8);
        // 1 切割
        String[] logContents = str.split("\\|");
        headers.put("timestamp", logContents[0]);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new EventDateInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
java
package com.rocket.interceptor;

import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class StartDateInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String str = new String(event.getBody(), StandardCharsets.UTF_8);
        JsonElement jsonElement = JsonParser.parseString(str);
        String timestamp = jsonElement.getAsJsonObject().get("t").getAsString();
        headers.put("timestamp", timestamp);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new StartDateInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
  1. 打包上传到hadoop103的/opt/module/flume-1.11.0/lib下面
sh
[jack@hadoop103 flume-1.11.0]$ ll lib | grep flume-inter
-rw-rw-r--. 1 jack jack    8238 11月  1 08:12 flume-interceptor-1.0-SNAPSHOT.jar

7.5 Flume的具体配置

  1. 在hadoop103的/opt/module/flume-1.11.0/conf目录下创建kafka-flume-hdfs.conf文件
ini
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=hadoop110:9092,hadoop111:9092,hadoop112:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize =5000
a1.sources.r2.batchDurationMillis=2000
a1.sources.r2.kafka.bootstrap.servers=hadoop110:9092,hadoop111:9092,hadoop112:9092
a1.sources.r2.kafka.topics=topic_event

## 配置时间拦截器
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.rocket.interceptor.StartDateInterceptor$Builder
a1.sources.r2.interceptors=i1
a1.sources.r2.interceptors.i1.type=com.rocket.interceptor.EventDateInterceptor$Builder


## channel1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume-1.11.0/checkpoint/behavior1
a1.channels.c1.dataDirs=/opt/module/flume-1.11.0/data/behavior1/
a1.channels.c1.keep-alive=6

## channel2
a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume-1.11.0/checkpoint/behavior2
a1.channels.c2.dataDirs=/opt/module/flume-1.11.0/data/behavior2/
a1.channels.c2.keep-alive=6

## sink1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop102:8020/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=logstart-

##sink2
a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop102:8020/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix=logevent-

## 不要产生大量小文件,生产环境rollInterval配置为3600
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.rollCount=0

a1.sinks.k2.hdfs.rollInterval=10
a1.sinks.k2.hdfs.rollSize=134217728
a1.sinks.k2.hdfs.rollCount=0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType=CompressedStream 
a1.sinks.k2.hdfs.fileType=CompressedStream 

a1.sinks.k1.hdfs.codeC=lzop
a1.sinks.k2.hdfs.codeC=lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
  1. 创建相关文件夹
sh
mkdir /opt/module/flume-1.11.0/checkpoint
mkdir -p /opt/module/flume-1.11.0/data/behavior1/
mkdir /opt/module/flume-1.11.0/data/behavior1/

7.6 日志消费Flume启动停止脚本

  1. 在/home/jack/bin目录下创建脚本flume_consumer
sh
[jack@hadoop103 bin]$ vim flume_consumer
# 在脚本中填写如下内容
#! /bin/bash

case $1 in
"start"){
        for i in hadoop103
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/module/flume-1.11.0/bin/flume-ng agent --conf /opt/module/flume-1.11.0/conf --conf-file /opt/module/flume-1.11.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE 1>/opt/module/flume-1.11.0/logs/flume.log 2>&1 &"
        done
};;
"stop"){
        for i in hadoop103
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done

};;
esac
  1. 增加脚本执行权限
sh
chmod 777 f2.sh

7.7 项目经验之Flume内存优化

  1. 问题描述:如果启动消费Flume抛出如下异常
sh
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
  1. 在hadoop103服务器的/opt/module/flume-1.11.0/conf/flume-env.sh文件中增加如下配置
sh
export JAVA_OPTS="-Xms100m -Xmx2048m -Dcom.sun.management.jmxremote"
  1. 同步配置到hadoop111、hadoop112服务器
sh
[jack@hadoop111 conf]$ xsync flume-env.sh
  1. Flume内存参数设置及优化 JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
    -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
    -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

8. 集群维护脚本

8.1 集群时间同步修改脚本

企业开发时,参考hadoop集群时间同步, 集群时间同步

8.2 集群所有进程查看脚本

  1. 在/home/jack/bin目录下创建脚本xcall:
sh
[jack@hadoop102 bin]$ vim xcall
# 在脚本中编写如下内容

#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
        echo --------- $i ----------
        ssh $i "$*"
done
  1. 修改脚本执行权限
sh
[jack@hadoop102 bin]$ chmod 777 xcall
  1. 测试脚本
sh
[jack@hadoop102 bin]$ xcall jps

8.3 采集通道启停脚本

如果直接将所有日期日志一次性采集,会发现HDFS集群上面数据结果都在一个日期文件夹📁下面, 也就是说当前集群并不支持按照日期重跑。

sh
#!/bin/bash

execute_flag=0
for i in hadoop102 hadoop103 hadoop104 hadoop110 hadoop111 hadoop112
do
        echo "=========$i=============="
        ssh $i "sudo date -s $1"
        if [ $i = 'hadoop102' ]
        then
            ssh $i "/home/jack/bin/hadoop_helper stop"
            sleep 10s;
            ssh $i "/home/jack/bin/hadoop_helper start"
        fi

        if [ $i = 'hadoop110' ]
        then
            ssh $i "/home/jack/bin/zk_helper stop"
            sleep 3s;
            ssh $i "/home/jack/bin/zk_helper start"

            sleep 3s;

            ssh $i "/home/jack/bin/flume_helper stop"
            sleep 3s;
            ssh $i "/home/jack/bin/flume_helper start"

            sleep 3s;

            ssh $i "/home/jack/bin/kafka_helper stop"
            sleep 3s;
            ssh $i "/home/jack/bin/kafka_helper start"
            execute_flag=1
            sleep 3s;
        fi

        if [ $i = 'hadoop112' ]
        then
            ssh hadoop103 "/home/jack/bin/flume_comsumer stop"
            sleep 3s;
            ssh hadoop103 "/home/jack/bin/flume_comsumer start"
            sleep 3s;
            ssh hadoop110 "nohup java -jar /opt/software/log-collector-1.0-jar-with-dependencies.jar 0 5000 1>/dev/null 2>&1 &"
            ssh hadoop111 "nohup java -jar /opt/software/log-collector-1.0-jar-with-dependencies.jar 0 5000 1>/dev/null 2>&1 &"
            sleep 3s;
        fi
done