Skip to content

Kafka单机版搭建

单机版本的搭建快速、简单,但是使用场景非常局限。仅适用于进行简单的测试阶段。

1. 下载解压

1.1 下载安装包

  1. 下载Kafka地址:https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz

信息

安装包的命名中,2.12表示Scala的版本。由于Kafka是使用Scala语言来编写的,因此在下载Kafka的时候, 会让你选择Scala语言的版本。你只需要选择与你本地的Scala版本相符的Kafka即可。

  1. 下载JDK地址: https://www.oracle.com/java/technologies/downloads/#java8-linux

1.2 上传解压缩文件

sh
# 将下载好的Kafka的安装包和jdk-8u391-linux-x64.tar.gz上传到/root/software目录下
# 进入到/root/software目录下,解压安装
[jack@hadoop105 software]$ tar -xvf kafka_2.12-3.6.1.tgz -C ../module/
## 更名方便后续的使用
[jack@hadoop105 software]$ mv ../module/kafka_2.12-3.6.1 kafka-3.6.1
[jack@hadoop105 software]$ tar -xvf jdk-8u391-linux-x64.tar.gz -C ../module/

1.3 配置环境

在/etc/profile.d创建kafka_env.sh

sh
[jack@hadoop105 profile.d]$ sudo vi kafka_env.sh
## 填写以下内容暴露环境变量
[jack@hadoop105 profile.d]$ cat kafka_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_391
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.6.1
export PATH=$PATH:$KAFKA_HOME/bin
## 配置生效
[jack@hadoop105 profile.d]$ source /etc/profile
[jack@hadoop105 profile.d]$ echo $KAFKA_HOME
/opt/module/kafka-3.6.1

2. 启动服务

2.1 启动ZooKeeper

Kafka严重依赖ZooKeeper,所以我们在启动Kafka之前,需要启动ZooKeeper服务。Kafka安装包里提供了zk服务,但仅适用于单机模式(zk也是单实例)。 多broker模式强烈建议使用自行部署的ZooKeeper。

  1. Kafka自带的ZooKeeper的启停脚本如下:
sh
[jack@hadoop105 bin]$ ll ./zookeeper*
-rwxr-xr-x. 1 jack wheel  867 11月 24 17:38 ./zookeeper-security-migration.sh
-rwxr-xr-x. 1 jack wheel 1393 11月 24 17:38 ./zookeeper-server-start.sh
-rwxr-xr-x. 1 jack wheel 1366 11月 24 17:38 ./zookeeper-server-stop.sh
-rwxr-xr-x. 1 jack wheel 1019 11月 24 17:38 ./zookeeper-shell.sh
  1. 启用Kafka自带的ZooKeeper服务:
sh
[jack@hadoop105 bin]$ ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[jack@hadoop105 bin]$ jps
53238 Jps
53213 QuorumPeerMain

2.2 修改Kafka配置

sh
[jack@hadoop105 bin]$ vim ../config/server.properties 
## 添加如下内容
listeners=PLAINTEXT://192.168.101.105:9092

表示Kafka监听192.168.101.105上的9092端口,使用PLAINTEXT协议(无加密)

2.3 启动Kafka

通过daemon进程启动Kafka

sh
[jack@hadoop105 bin]$ ./kafka-server-start.sh -daemon ../config/server.properties 
[jack@hadoop105 bin]$ jps
53620 Kafka
53656 Jps
53213 QuorumPeerMain

启动之后,可以使用jps命令查看当前节点的进程。如果出现名为Kafka的进程,说明Kafka的单机版本搭建完成。

3. 配置开机启动

3.1 编写单机启动脚本

sh
#!/bin/bash

COMMAND=$1
if [ ! $COMMAND ];then
    echo "please input your operation"
    exit 1
fi
if [ "$COMMAND" != "start" -a "$COMMAND" != "stop" ];then
    echo "please input your operation in[start | stop]"
    exit 1
fi

# 配置Kafka的安装路径
KAFKA_HOME=/opt/module/kafka-3.7.2

# 获取进程ID
get_pid() {
    local process_name=$1
    ps -ef | grep "$process_name" | grep -v grep | awk '{print $2}'
}

if [ "$COMMAND" = "start" ];then
  if [ -z $(get_pid "QuorumPeerMain") ];then
      echo "Zookeeper is not running. starting Zookeeper ...."
      $KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
      ## 稍等zookeeper启动成功后
      ZookeeperVersion=$($KAFKA_HOME/bin/zookeeper-shell.sh 127.0.0.1:2181 version |grep -w "version"|  awk -F':' '{print $2}')
     echo "current Zookeeper version : $ZookeeperVersion"
  fi
  echo "starting kafka"
  $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
elif [ "$COMMAND" = "stop" ];then
  echo "stop kafka...."
  $KAFKA_HOME/bin/kafka-server-stop.sh
fi
echo "execute $COMMAND finish"

3.2 添加服务

sh
sudo vim /etc/systemd/system/kafka.service
## 添加如下内容
[Unit]
Description=Apache Kafka
After=network.target

[Service]
Type=forking
Environment="JAVA_HOME=/opt/module/jdk-17.0.13"
User=root
Group=root
ExecStart=/home/jack/bin/kf.sh start
ExecStop=/home/jack/bin/kf.sh stop
PrivateTmp=true

[Install]
WantedBy=multi-user.target

3.3 重新加载service

sh
sudo systemctl daemon-reload

3.4 设置开机自启

sh
sudo systemctl enable kafka

4. 测试Kafka

4.1 创建主题

在Kafka中,消息是需要存储于Topic主题中的,需要提前创建Topic。

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --create \
--bootstrap-server hadoop105:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test
Created topic test.
## 查询已经创建了的主题
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --list \
--bootstrap-server hadoop105:9092
test

其中参数说明:

  • bootstrap-server: kafka节点地址
  • replication-factor: 副本因子
  • partitions: 分区
  • topic: 主题

4.2 发送消息

Kafka自带一个生产者客户端kafka-console-producer.sh,它将从文件或标准输入中获取输入, 并将其发送到指定的消息服务器,服务端收到后将作为topic的消息存储起来。如下所示,运行Producer,然后输入一些信息到控制台并发送到服务端:

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-producer.sh \
--broker-list hadoop105:9092 \
--topic test \
kafka is a message mq

4.3 消费消息

Kafka有一个消费者客户端命令,它可以将消费的消息输出到控制台:

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-consumer.sh \
--bootstrap-server hadoop105:9092 \
--topic test
kafka is a message mq