Skip to content

Kafka主题

topic可以实现消息的分类,不同消费者订阅不同的topic。
关于topic的操作脚本:kafka-topics.sh

1. 主题命令参数

参数描述
--bootstrap-server ip:port连接的Kafka Broker主机名称和端口号
--topic 主题名称操作的 topic 名称
--create创建主题
--delete删除主题。
--alter修改主题,用于修改分区数
--list查看所有主题。
--describe查看主题详细描述,多个用逗号隔开。
--partitions 分区数设置分区数。
--replication-factor 副本数设置分区副本。
--config name=value形式更新系统默认的配置。

2. 创建主题

sh
[root@hadoop105 kafka]# bin/kafka-topics.sh --create \
--topic hadoop \ ## 指定要创建的topic的名称
--bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 \ ##指定kafka关联的zk地址
--partitions 3 \ ##指定该topic的分区个数
--replication-factor 3 ##指定副本因子

## 查看zk上面节点变化信息
[jack@hadoop105 bin]$ ./zookeeper-shell.sh localhost:2181
ls /kafka/brokers/topics
[hadoop]

危险

指定副本因子的时候,不能大于broker实例个数,否则报错

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --create --topic hadoop --bootstrap-server localhost:9092 --replication-factor 4 --partitions 3
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2024-03-12 01:30:34,555] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)

3. 查看所有topic

sh
## 查看创建的主题
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --list \
--bootstrap-server hadoop102:9092,hadoop103:9092,hadoop105:9092
hadoop

4. 查看主题详情

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --describe -topic hadoop
Topic: hadoop	TopicId: 7K55cLp0RjSJZtZ8xIiM5Q	PartitionCount: 3	ReplicationFactor: 3	Configs: 
	Topic: hadoop	Partition: 0	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
	Topic: hadoop	Partition: 1	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: hadoop	Partition: 2	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2

Topic信息详情说明:
第一行: Topic为主题名,TopicId为全局唯一ID, PartitionCount表示hadoop主题一共有3个分区,ReplicationFactor表示每个分区有3个副本。
第二行至最后一行: 格式为主机名+分区编号(Partition)+当前分区的主brokerid+分区所有副本所在的brokerid+当前副本的存活的brokerid

提示

主题的分区中的副本存在主从关系,而分区本身不区分主从。此外需要区分分区编号不是brokerid,分区编号从0开始,而broerid是主机配置在主机中server.properties文件的整型值。

5. 修改主题分区

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --alter -topic hadoop  --partitions 4

警告

分区数只能增加,不能减少

6. 删除主题

sh
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --list
demo1
hadoop
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --delete -topic demo1
[jack@hadoop105 kafka-3.6.1]$ ./bin/kafka-topics.sh --bootstrap-server hadoop105:9092,hadoop106:9092,hadoop107:9092 --list
hadoop

7. 删除主题数据

使用kafka-delete-records.sh脚本可以删除指定的offset位置,删除之前的所有老数据,保留指定offset位置和之后的数据。

7.1 编辑offsets.json

该文件指定了要删除数据的主题、分区和偏移量:

sh
[jack@hadoop101 bin]$ cat offsets.json 
{
    "partitions": [
        {"topic": "myTopic", "partition": 0, "offset": 38}
    ],
    "version": 1
}

7.2 删除数据

执行以下命令来删除数据:

sh
[jack@hadoop101 bin]$ ./kafka-delete-records.sh --bootstrap-server 192.168.101.101:9092 --offset-json-file offsets.json
Executing records delete operation
Records delete operation completed:
partition: myTopic-0    low_watermark: 38