Skip to content

Kafka单机版搭建

单机版本的搭建快速、简单,但是使用场景非常局限。仅适用于进行简单的测试阶段。

1. 下载解压

1.1 下载安装包

  1. 下载Kafka地址:https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz

信息

安装包的命名中,2.12表示Scala的版本。由于Kafka是使用Scala语言来编写的,因此在下载Kafka的时候, 会让你选择Scala语言的版本。你只需要选择与你本地的Scala版本相符的Kafka即可。

  1. 下载JDK地址: https://www.oracle.com/java/technologies/downloads/#java8-linux

1.2 上传解压缩文件

sh
# 将下载好的Kafka的安装包和jdk-8u391-linux-x64.tar.gz上传到/root/software目录下
# 进入到/root/software目录下,解压安装
[jack@hadoop105 software]$ tar -xvf kafka_2.12-3.6.1.tgz -C ../module/
## 更名方便后续的使用
[jack@hadoop105 software]$ mv ../module/kafka_2.12-3.6.1 kafka-3.6.1
[jack@hadoop105 software]$ tar -xvf jdk-8u391-linux-x64.tar.gz -C ../module/

1.3 配置环境

在/etc/profile.d创建kafka_env.sh

sh
[jack@hadoop105 profile.d]$ sudo vi kafka_env.sh
## 填写以下内容暴露环境变量
[jack@hadoop105 profile.d]$ cat kafka_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_391
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.6.1
export PATH=$PATH:$KAFKA_HOME/bin
## 配置生效
[jack@hadoop105 profile.d]$ source /etc/profile
[jack@hadoop105 profile.d]$ echo $KAFKA_HOME
/opt/module/kafka-3.6.1

2. 启动服务

2.1 启动ZooKeeper

Kafka严重依赖ZooKeeper,所以我们在启动Kafka之前,需要启动ZooKeeper服务。Kafka安装包里提供了zk服务,但仅适用于单机模式(zk也是单实例)。 多broker模式强烈建议使用自行部署的ZooKeeper。

  1. Kafka自带的ZooKeeper的启停脚本如下:
sh
[jack@hadoop105 bin]$ ll ./zookeeper*
-rwxr-xr-x. 1 jack wheel  867 11月 24 17:38 ./zookeeper-security-migration.sh
-rwxr-xr-x. 1 jack wheel 1393 11月 24 17:38 ./zookeeper-server-start.sh
-rwxr-xr-x. 1 jack wheel 1366 11月 24 17:38 ./zookeeper-server-stop.sh
-rwxr-xr-x. 1 jack wheel 1019 11月 24 17:38 ./zookeeper-shell.sh
  1. 启用Kafka自带的ZooKeeper服务:
sh
[jack@hadoop105 bin]$ ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[jack@hadoop105 bin]$ jps
53238 Jps
53213 QuorumPeerMain

2.2 修改Kafka配置

sh
[jack@hadoop105 bin]$ vim ../config/server.properties 
## 添加如下内容
listeners=PLAINTEXT://192.168.101.105:9092

表示Kafka监听192.168.101.105上的9092端口,使用PLAINTEXT协议(无加密)

2.3 启动Kafka

通过daemon进程启动Kafka

sh
[jack@hadoop105 bin]$ ./kafka-server-start.sh -daemon ../config/server.properties 
[jack@hadoop105 bin]$ jps
53620 Kafka
53656 Jps
53213 QuorumPeerMain

启动之后,可以使用jps命令查看当前节点的进程。如果出现名为Kafka的进程,说明Kafka的单机版本搭建完成。

2.4 单机启动脚本

sh
#!/bin/bash

COMMAND=$1
if [ ! $COMMAND ];then
    echo "please input your operation"
    exit 1
fi
if [ $COMMAND != "start" -a $COMMAND != "stop" ];then
    echo "please input your operation in[start | stop]"
    exit 1
fi

# 配置Kafka的安装路径
KAFKA_HOME=/opt/module/kafka-3.6.1
# 获取Zookeeper的进程号
ZOOKEEPER_PID=$(ps -ef | grep zookeeper | grep -v grep | awk '{print $2}')

if [ "$COMMAND" = "start" ];then
  if [ -z "$ZOOKEEPER_PID" ]; then
      echo "Zookeeper is not running. starting Zookeeper ...."
      $KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
	fi
	$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
elif [ "$COMMAND" = "stop" ];then
  ## 先停掉kafka, 再停掉zookeeper
  $KAFKA_HOME/bin/kafka-server-stop.sh
  if [ -n "$ZOOKEEPER_PID" ]; then
     echo "Zookeeper is running. stopping Zookeeper ...."
     $KAFKA_HOME/bin/zookeeper-server-stop.sh
  fi
fi
echo "$COMMAND 操作完毕"

3. 测试Kafka

3.1 创建主题

在Kafka中,消息是需要存储于Topic主题中的,需要提前创建Topic。

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --create \
> --bootstrap-server hadoop105:9092 \
> --replication-factor 1 \
> --partitions 1 \
> --topic test
Created topic test.
## 查询已经创建了的主题
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --list \
> --bootstrap-server hadoop105:9092
test

其中参数说明:
bootstrap-server: kafka节点地址
replication-factor: 副本因子
partitions: 分区
topic: 主题

3.2 发送消息

Kafka自带一个生产者客户端kafka-console-producer.sh,它将从文件或标准输入中获取输入, 并将其发送到指定的消息服务器,服务端收到后将作为topic的消息存储起来。
如下所示,运行Producer,然后输入一些信息到控制台并发送到服务端:

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-producer.sh \
> --broker-list hadoop105:9092 \
> --topic test \
> kafka is a message mq

3.3 消费消息

Kafka有一个消费者客户端命令,它可以将消费的消息输出到控制台:

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-console-consumer.sh \
> --bootstrap-server hadoop105:9092 \
> --topic test
kafka is a message mq