Kafka单机版搭建
单机版本的搭建快速、简单,但是使用场景非常局限。仅适用于进行简单的测试阶段。
1. 下载解压
1.1 下载安装包
信息
安装包的命名中,2.12表示Scala的版本。由于Kafka是使用Scala语言来编写的,因此在下载Kafka的时候, 会让你选择Scala语言的版本。你只需要选择与你本地的Scala版本相符的Kafka即可。
1.2 上传解压缩文件
sh
# 将下载好的Kafka的安装包和jdk-8u391-linux-x64.tar.gz上传到/root/software目录下
# 进入到/root/software目录下,解压安装
[jack@hadoop105 software]$ tar -xvf kafka_2.12-3.6.1.tgz -C ../module/
## 更名方便后续的使用
[jack@hadoop105 software]$ mv ../module/kafka_2.12-3.6.1 kafka-3.6.1
[jack@hadoop105 software]$ tar -xvf jdk-8u391-linux-x64.tar.gz -C ../module/
1.3 配置环境
在/etc/profile.d创建kafka_env.sh
sh
[jack@hadoop105 profile.d]$ sudo vi kafka_env.sh
## 填写以下内容暴露环境变量
[jack@hadoop105 profile.d]$ cat kafka_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_391
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.6.1
export PATH=$PATH:$KAFKA_HOME/bin
## 配置生效
[jack@hadoop105 profile.d]$ source /etc/profile
[jack@hadoop105 profile.d]$ echo $KAFKA_HOME
/opt/module/kafka-3.6.1
2. 启动服务
2.1 启动ZooKeeper
Kafka严重依赖ZooKeeper,所以我们在启动Kafka之前,需要启动ZooKeeper服务。Kafka安装包里提供了zk服务,但仅适用于单机模式(zk也是单实例)。 多broker模式强烈建议使用自行部署的ZooKeeper。
- Kafka自带的ZooKeeper的启停脚本如下:
sh
[jack@hadoop105 bin]$ ll ./zookeeper*
-rwxr-xr-x. 1 jack wheel 867 11月 24 17:38 ./zookeeper-security-migration.sh
-rwxr-xr-x. 1 jack wheel 1393 11月 24 17:38 ./zookeeper-server-start.sh
-rwxr-xr-x. 1 jack wheel 1366 11月 24 17:38 ./zookeeper-server-stop.sh
-rwxr-xr-x. 1 jack wheel 1019 11月 24 17:38 ./zookeeper-shell.sh
- 启用Kafka自带的ZooKeeper服务:
sh
[jack@hadoop105 bin]$ ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[jack@hadoop105 bin]$ jps
53238 Jps
53213 QuorumPeerMain
2.2 修改Kafka配置
sh
[jack@hadoop105 bin]$ vim ../config/server.properties
## 添加如下内容
listeners=PLAINTEXT://192.168.101.105:9092
表示Kafka监听192.168.101.105上的9092端口,使用PLAINTEXT协议(无加密)
2.3 启动Kafka
通过daemon进程启动Kafka
sh
[jack@hadoop105 bin]$ ./kafka-server-start.sh -daemon ../config/server.properties
[jack@hadoop105 bin]$ jps
53620 Kafka
53656 Jps
53213 QuorumPeerMain
启动之后,可以使用jps命令查看当前节点的进程。如果出现名为Kafka的进程,说明Kafka的单机版本搭建完成。
2.4 单机启动脚本
sh
#!/bin/bash
COMMAND=$1
if [ ! $COMMAND ];then
echo "please input your operation"
exit 1
fi
if [ $COMMAND != "start" -a $COMMAND != "stop" ];then
echo "please input your operation in[start | stop]"
exit 1
fi
# 配置Kafka的安装路径
KAFKA_HOME=/opt/module/kafka-3.6.1
# 获取Zookeeper的进程号
ZOOKEEPER_PID=$(ps -ef | grep zookeeper | grep -v grep | awk '{print $2}')
if [ "$COMMAND" = "start" ];then
if [ -z "$ZOOKEEPER_PID" ]; then
echo "Zookeeper is not running. starting Zookeeper ...."
$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
fi
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
elif [ "$COMMAND" = "stop" ];then
## 先停掉kafka, 再停掉zookeeper
$KAFKA_HOME/bin/kafka-server-stop.sh
if [ -n "$ZOOKEEPER_PID" ]; then
echo "Zookeeper is running. stopping Zookeeper ...."
$KAFKA_HOME/bin/zookeeper-server-stop.sh
fi
fi
echo "$COMMAND 操作完毕"
3. 测试Kafka
3.1 创建主题
在Kafka中,消息是需要存储于Topic主题中的,需要提前创建Topic。
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --create \
> --bootstrap-server hadoop105:9092 \
> --replication-factor 1 \
> --partitions 1 \
> --topic test
Created topic test.
## 查询已经创建了的主题
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --list \
> --bootstrap-server hadoop105:9092
test
其中参数说明:
bootstrap-server: kafka节点地址
replication-factor: 副本因子
partitions: 分区
topic: 主题
3.2 发送消息
Kafka自带一个生产者客户端kafka-console-producer.sh
,它将从文件或标准输入中获取输入, 并将其发送到指定的消息服务器,服务端收到后将作为topic的消息存储起来。
如下所示,运行Producer,然后输入一些信息到控制台并发送到服务端:
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-producer.sh \
> --broker-list hadoop105:9092 \
> --topic test \
> kafka is a message mq
3.3 消费消息
Kafka有一个消费者客户端命令,它可以将消费的消息输出到控制台:
sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-consumer.sh \
> --bootstrap-server hadoop105:9092 \
> --topic test
kafka is a message mq