Skip to content

第三方集成Kafka

1. SpringBoot集成Kafka

1.1 SpringBoot 环境准备

通过Spring Initializr创建一个SpringBoot项目, 访问网址进行下载脚手架:https://start.spring.io/(如果网络不稳定,可以访问国内地址: https://start.aliyun.com/)。 Alt text
勾选依赖组件:

  • Web: Spring Web
  • Messaging: Spring for Apache Kafka Alt text

1.2 下载后用IDEA打开

调整maven代码结构, 如图所示: Alt text 其中kafka-integration项目下面有kafka-integration-producer、kafka-integration-consumer两个子项目。kafka-integration-producer是生产者,kafka-integration-consumer是消费者接收消息 三个项目的pom.xml内容如下:

xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>kafka-integration</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>kafka-integration</name>
    <description>第三方框架集成Kafka</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.7.6</spring-boot.version>
    </properties>

    <modules>
        <module>kafka-integration-producer</module>
        <module>kafka-integration-consumer</module>
    </modules>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
            </plugin>
        </plugins>
    </build>
</project>
xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>kafka-integration</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <artifactId>kafka-integration-producer</artifactId>
    <name>kafka-integration-web</name>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.7.6</version>
            </plugin>
        </plugins>
    </build>
</project>
xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>kafka-integration</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <artifactId>kafka-integration-consumer</artifactId>
    <name>kafka-integration-service</name>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.7.6</version>
            </plugin>
        </plugins>
    </build>
</project>

1.3 SpringBoot生产者

  1. 修改 SpringBoot 核心配置文件 application.yml, 添加生产者相关信息
yml
server:
  port: 8081
spring:
  kafka:
    bootstrap-servers: 192.168.101.105:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test555

1.5 创建Controller从浏览器接收数据, 并写入指定的Topic

java
@Controller
public class BasicController {

    @Resource
    KafkaTemplate kafkaTemplate;

    @RequestMapping("/produceMsg")
    @ResponseBody
    public String hello(String msgContent) {
        if(msgContent!=null){
            kafkaTemplate.send("hadoop", msgContent);
            return "发送成功";
        }else{
            return "msgContent不能为空";
        }
    }
}

启动生产者应用

1.6 SpringBoot消费者

修改 SpringBoot 核心配置文件 application.yml

yml
server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.101.105:9092
    consumer:
      group-id: test000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.7 创建类消费Kafka中指定Topic的数据

java
@Component
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics = "hadoop")
    public void consumeTopic(String msg) { // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }
}

1.7 使用IDEA内置Http客户端功能给/produceMsg接口发送数据

Alt text 发送内容如下:

POST http://localhost:8081/produceMsg
Content-Type: application/x-www-form-urlencoded

msgContent=999

查看消费端控制台:
Alt text

2. Flink集成Kafka

Flink是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于Flink的消费者。 Alt text

  1. 创建一个maven项目 flink-kafka
  2. 添加pom.xml配置文件
xml
<properties>
    <flink.version>1.18.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.1.0-1.18</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.17.2</version>
    </dependency>
</dependencies>
  1. 将log4j.properties文件添加到 resources里面
ini
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
java
public static void main(String[] args) throws Exception {
    // 1 初始化 flink 环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    // 1 读取集合中数据
    ArrayList<String> wordsList = new ArrayList<>();
    wordsList.add("hello");
    wordsList.add("world");
    DataStream<String> stream = env.fromCollection(wordsList);
    // 2 kafka 生产者配置信息
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
    // 3 创建 kafka 生产者
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
            "hadoop",
            new SimpleStringSchema(),
            properties
    );
    // 4 生产者和 flink 流关联
    stream.addSink(kafkaProducer);
    // 5 执行
    env.execute();
}
java
public class FlinkConsumer {

    public static void main(String[] args) throws Exception {
        // 1 初始化 flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        // 2 kafka 消费者配置信息
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink");
        // 3 创建 kafka 消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "hadoop",
                new SimpleStringSchema(),
                properties
        );
        // 4 消费者和 flink 流关联
        env.addSource(kafkaConsumer).print();
        // 4 执行
        env.execute();
    }
}

2.4 分别依次启动FlinkConsumer消费者、FlinkProducer生产者

观察IDEA控制台数据打印 Alt text

3. Spark集成Kafka

Alt text

3.1 Spark环境准备

  1. 创建一个maven项目spark-kafka
  2. 在项目spark-kafka上点击右键,Add Framework Support =>勾选 scala
    Alt text
  3. 在 main 下创建scala文件夹,并右键 Mark Directory as Sources Root=>在 scala 下创建包名为com.example.spark
    Alt text
  4. 将log4j.properties文件添加到resources里面
ini
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
  1. 项目pom.xml文件配置
xml
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.17.2</version>
    </dependency>
</dependencies>

3.2 Spark 生产者

scala
object SparkKafkaProducer {
  def main(args: Array[String]): Unit = {
    // 0 kafka 配置信息
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop105:9092,hadoop106:9092,hadoop107:9092")
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    // 1 创建 kafka 生产者
    var producer = new KafkaProducer[String, String](properties)
    // 2 发送数据
    for (i <- 1 to 5) {
      producer.send(new ProducerRecord[String, String]("hadoop", "jack" + i))
    }
    // 3 关闭资源
    producer.close()
  }
}

3.3 Spark 消费者

scala
object SparkKafkaConsumer {
    def main(args: Array[String]): Unit = {
        //1.创建 SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")
        //2.创建 StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        //3.定义 Kafka 参数:kafka 集群地址、消费者组名称、key 序列化、value 序列化
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "demoGroup",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )
        //4.读取 Kafka 数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
            KafkaUtils.createDirectStream[String, String](
                ssc,
                LocationStrategies.PreferConsistent, //优先位置
                ConsumerStrategies.Subscribe[String, String](Set("hadoop"), kafkaPara) // 消费策略:(订阅多个主题,配置参数
            )
        //5.将每条消息的 KV 取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
        //6.计算 WordCount
        valueDStream.print()
        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

3.4 启动SparkKafkaConsumer消费者、SparkKafkaProducer生产者

查看控制台打印结果:
Alt text

4. Flume集成Kafka

Flume是一个在大数据开发中非常常用的组件。可以用于Kaka的生产者,也可以用于Flume的消费者。

4.1 Flume生产者

Alt text

  1. 启动Kafka消费者
sh
sh kafka_3.6.2/bin/kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic first

4.2 Flume消费者

Alt text

  1. 启动Kafka生产者
sh
bin/kafka-console-producer.sh --bootstrap-server hadoop110:9092 --topic first
  1. 配置flume的Agent, 在flume的job目录下创建kafka-flume-file.conf:
ini
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动flume
sh
/opt/module/flume-1.11.0/bin/flume-ng agent --conf conf --conf-file /opt/module/flume-1.11.0/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE
## 观察控制台