Skip to content

Kafka集群版本搭建

1. 集群规划

主机组件名
192.168.101.105(hadoop105)broker、zookeeper
192.168.101.106(hadoop106)broker、zookeeper
192.168.101.107(hadoop107)broker、zookeeper

2. 准备工作

2.1 JDK

  1. 下载JDK,并上传到/opt/software目录下。
    下载地址:https://www.oracle.com/java/technologies/downloads/#java8-linux
  2. 解压JDK到指定的目录下
sh
[jack@hadoop105 software]$ tar -xvf jdk-8u391-linux-x64.tar.gz -C ../module/
  1. 配置环境变量 在/etc/profile.d创建kafka_env.sh
sh
[jack@hadoop105 profile.d]$ sudo vi kafka_env.sh
## 填写以下内容暴露环境变量
[jack@hadoop105 profile.d]$ cat kafka_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_391
export PATH=$PATH:$JAVA_HOME/bin

2.2 Kafka

  1. 下载Kafka,并上传到/opt/software目录下。
    下载地址: https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz
  2. 解压安装Kafka
sh
# 将下载好的Kafka的安装包上传到/root/software目录下
# 进入到/root/software目录下,解压安装
[jack@hadoop105 software]$ tar -xvf kafka_2.12-3.6.1.tgz -C ../module/
## 原名太长,更名方便后续的使用
[jack@hadoop105 software]$ mv ../module/kafka_2.12-3.6.1 kafka-3.6.1
  1. 配置环境变量 在/etc/profile.d目录下编辑kafka_env.sh文件
sh
[jack@hadoop105 profile.d]$ sudo vi kafka_env.sh
[jack@hadoop105 profile.d]$ cat kafka_env.sh 
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_391
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.6.1
export PATH=$PATH:$KAFKA_HOME/bin
## 配置生效
[jack@hadoop105 profile.d]$ source /etc/profile
[jack@hadoop105 profile.d]$ echo $KAFKA_HOME
/opt/module/kafka-3.6.1

2.3 修改配置文件

  1. 修改配置文件$KAFKA_HOME/config/server.properties
ini
# 当前节点Kafka实例的ID号,必须是整数,且在整个集群中不能重复
broker.id=1
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,
#kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka-3.6.1/data
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认是 1 个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop105:2181,hadoop106:2181,hadoop107:2181/kafka
  1. 将修改配置文件发送到其他的节点
sh
# 创建数据存储目录
[jack@hadoop105 module]$ mkdir kafka-3.6.1/data
# 分发Kafka
[jack@hadoop105 module]$ scp -r kafka-3.6.1/ jack@192.168.101.106:/opt/module/
[jack@hadoop105 module]$ scp -r kafka-3.6.1/ jack@192.168.101.107:/opt/module/
# 环境变量也同步一下
[jack@hadoop105 profile.d]$ scp kafka_env.sh root@192.168.101.106:/etc/profile.d/
[jack@hadoop105 profile.d]$ scp kafka_env.sh root@192.168.101.107:/etc/profile.d/
  1. 分别在hadoop106和hadoop107上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3
sh
[jack@hadoop106 ~]$ vi config/server.properties
broker.id=2
  1. 清理data文件夹下的数据文件
sh
[jack@hadoop107 data]$ rm -rf *

3. 启动集群

3.1 启动ZooKeeper

Kafka是非常依赖ZooKeeper的,需要在ZooKeeper上存储元数据信息,因此必须要先启动ZooKeeper。依次在三个指点上启动ZooKeeper就可以。 Kafka2.8.0以后也可以配置不采用ZooKeeper。

3.2 启动Kafka

sh
## 依次在三台机器上面执行下面命令启动Kafka
[jack@hadoop105 bin]$ ./kafka-server-start.sh -daemon ../config/server.properties
  1. 设置免密远程登录
sh
[jack@hadoop105 .ssh]$ cd /home/jack/.ssh
[jack@hadoop105 .ssh]$ ssh-keygen
[jack@hadoop105 .ssh]$ ssh-copy-id hadoop106
[jack@hadoop105 .ssh]$ ssh-copy-id hadoop107
  1. 一键启停脚本 在/home/jack/bin目录下编写脚本 kafka-cluster.sh,并赋予执行权限。调用脚本的时候需要带上参数start或者stop
sh
#!/bin/bash

COMMAND=$1
if [ ! $COMMAND ];then
    echo "please input your operation"
    exit -1
fi
if [ $COMMAND != "start" -a $COMMAND != "stop" ];then
    echo "please input your operation in[start | stop]"
    exit -1
fi

# 配置Kafka的安装路径
KAFKA_HOME=/opt/module/kafka-3.6.1
# 遍历所有的Broker的节点名字
H0STS=( hadoop105 hadoop106 hadoop107 )

for HOST in ${H0STS[@]}; do
    msg="$HOST ..."
	
    ssh -T $HOST << TERMINATE
    if [ $COMMAND = "start" ];then
        $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
    elif [ $COMMAND = "stop" ];then
        $KAFKA_HOME/bin/kafka-server-stop.sh
    fi
    exit
TERMINATE
    msg=${msg}"done!"
    echo $msg
done
  1. 添加执行权限
sh
[jack@hadoop105 ~]$ chmod +x kafka-cluster.sh
  1. 执行集群启动
sh
[jack@hadoop105 ~]$ kafka-cluster start
hadoop105 ...done!
hadoop106 ...done!
hadoop107 ...done!

警告

停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。