Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

  • 4. Java应用场景——SpringBoot集成
    • 4.1 创建SpringBoot项目
      • 4.1.1 创建SpringBoot项目
      • 4.1.2 修改pom.xml文件
      • 4.1.3 在resources中增加application.yml文件
    • 4.2 编写功能代码
      • 4.2.1 创建配置类:SpringBootKafkaConfig
      • 4.2.2 创建生产者控制器:KafkaProducerController
      • 4.2.3 创建消费者:KafkaDataConsumer
    • 4.3 集成测试
      • 4.3.1 启动ZooKeeper
      • 4.3.2 启动Kafka
      • 4.3.3 启动应用程序
      • 4.3.4 生产数据测试

在这里插入图片描述

在这里插入图片描述

本文档参看的视频是:

  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学

本文档参看的文档是:

  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

在这之前大家可以看我以下几篇文章,循序渐进:

❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(05)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(06)——对标尚硅谷Kafka教程

在这里插入图片描述

4. Java应用场景——SpringBoot集成

Spring Boot帮助您创建可以运行的、独立的、生产级的基于Spring的应用程序。您可以使用Spring Boot创建Java应用程序,这些应用程序可以通过使用java-jar或更传统的war部署启动。
我们的目标是:

  • 为所有Spring开发提供从根本上更快、广泛访问的入门体验。
  • 开箱即用,但随着要求开始偏离默认值,请迅速离开。
  • 提供大型项目(如嵌入式服务器、安全性、指标、健康检查和外部化配置)常见的一系列非功能性功能。
  • 绝对没有代码生成(当不针对原生图像时),也不需要XML配置。

在这里插入图片描述

在这里插入图片描述

4.1 创建SpringBoot项目

4.1.1 创建SpringBoot项目

在这里插入图片描述

在这里插入图片描述

4.1.2 修改pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>0.0.1</version>
    <name>springboot-kafka</name>
    <description>Kafka project for Spring Boot</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-json</artifactId>
            <version>5.8.11</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-db</artifactId>
            <version>5.8.11</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

在这里插入图片描述

4.1.3 在resources中增加application.yml文件

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: all
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 0
    consumer:
      group-id: test#消费者组
      #消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
      # earliest:无提交记录,从头开始消费
      #latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest
      enable-auto-commit: true #是否自动提交偏移量offset
      auto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        #poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000
        #每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        #0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        #仍然会返回该消息,以确保消费者可以进行
        #max.partition.fetch.bytes=1048576  #1M
    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      #ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
    template:
      default-topic: "test"
server:
  port: 9999

4.2 编写功能代码

4.2.1 创建配置类:SpringBootKafkaConfig

package com.atguigu.springkafka.config;

public class SpringBootKafkaConfig {
    public static final String TOPIC_TEST = "test";
    public static final String GROUP_ID = "test";
}

在这里插入图片描述

4.2.2 创建生产者控制器:KafkaProducerController

package com.atguigu.springkafka.controller;

import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.*;

import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaProducerController {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @ResponseBody
    @PostMapping(value = "/produce", produces = "application/json")
    public String produce(@RequestBody Object obj) {

        try {
            String obj2String = JSONUtil.toJsonStr(obj);
            kafkaTemplate.send(SpringBootKafkaConfig.TOPIC_TEST, obj2String);
            return "success";
        } catch (Exception e) {
            e.getMessage();
        }
        return "success";
    }
}

在这里插入图片描述

4.2.3 创建消费者:KafkaDataConsumer

package com.atguigu.springkafka.component;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;


@Component
@Slf4j
public class KafkaDataConsumer {
    @KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)
    public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        for (String message : messages) {
            final JSONObject entries = JSONUtil.parseObj(message);
            System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("data"));
            //ack.acknowledge();
        }
    }
}

在这里插入图片描述

4.3 集成测试

4.3.1 启动ZooKeeper

在这里插入图片描述

在这里插入图片描述

4.3.2 启动Kafka

4.3.3 启动应用程序

在这里插入图片描述

在这里插入图片描述

4.3.4 生产数据测试

可以采用任何的工具进行测试,我们这里采用postman发送POST数据

在这里插入图片描述

消费者消费数据

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579320.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

debian配置BIND DNS服务器

前言 局域网内有很多台主机&#xff0c;IP难以记忆。 而修改hosts文件又难以做到配置共享和统一&#xff0c;需要一台内网的DNS服务器。 效果展示 这里添加了一个域名hello.dog&#xff0c;将其指向为192.168.1.100。 同时&#xff0c;外网的域名不会受到影响&#xff0c;…

基于粒子滤波器的电池剩余使用寿命计算matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 粒子滤波器基础 4.2 电池剩余使用寿命建模与预测 4.3 粒子滤波器在电池寿命预测中的应用 5.完整工程文件 1.课题概述 基于粒子滤波器的电池剩余使用寿命计算。根据已知的数据&#xff0c;预测未来…

前端框架编译器之模板编译

编译原理概述 编译原理&#xff1a;是计算机科学的一个分支&#xff0c;研究如何将 高级程序语言 转换为 计算机可执行的目标代码 的技术和理论。 高级程序语言&#xff1a;Python、Java、JavaScript、TypeScript、C、C、Go 等。计算机可执行的目标代码&#xff1a;机器码、汇…

高级IO|从封装epoll服务器到实现Reactor服务器|Part1

从封装epoll_server到实现reactor服务器(part1) 项目复习&#xff1a;从封装epoll_server到实现reactor服务器(part1)EPOLL模式服务器初步 select, poll, epoll的优缺点epoll的几个细节封装epoll_server基本框架先写好创建监听套接字和创建epoll模型可以Accept了吗&#xff1f…

鸿蒙OpenHarmony【轻量系统 运行】 (基于Hi3861开发板)

运行 联网配置 由于Hi3861为WLAN模组&#xff0c;您可以在版本编译及烧录后&#xff0c;通过如下操作&#xff0c;使开发板实现联网功能。 保持Windows工作台和Hi3861 WLAN模组的连接状态&#xff0c;确认串口终端显示正常。 复位Hi3861 WLAN模组&#xff0c;终端界面显示“…

网络攻击日益猖獗,安全防护刻不容缓

“正在排队登录”、“账号登录异常”、“断线重连”......伴随着社交软件用户的一声声抱怨&#xff0c;某知名社交软件的服务器在更新上线2小时后&#xff0c;遭遇DDoS攻击&#xff0c;导致用户无法正常登录。在紧急维护几小时后&#xff0c;这款软件才恢复正常登录的情况。 这…

视频通话实时换脸:支持训练面部模型 | 开源日报 No.235

iperov/DeepFaceLive Stars: 19.7k License: GPL-3.0 DeepFaceLive 是一个用于 PC 实时流媒体或视频通话的人脸换装工具。 可以使用训练好的人脸模型从网络摄像头或视频中交换面部。提供多个公共面部模型&#xff0c;包括 Keanu Reeves、Mr. Bean 等。支持自己训练面部模型以…

基于数据挖掘的斗鱼直播数据可视化分析系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 随着网络直播平台的兴起&#xff0c;斗鱼直播作为其中的佼佼者&#xff0c;吸引了大量用户和观众。为了更好地理解和分析斗鱼直播中的数据&#xff0c;本项目介绍了一个基于数据挖掘的斗鱼直播数据…

【图解计算机网络】简单易懂的https原理解析

简单易懂的https原理解析 https与http的区别混合加密对称加密非对称加密混合加密解析混合加密问题 摘要算法数字证书数字证书原理为什么通过CA证书可以解决中间人攻击的问题呢&#xff1f; https握手流程 https与http的区别 http是明文传输的&#xff0c;非常不安全&#xff0…

呆马科技——智慧应急执法监管平台

在当今社会&#xff0c;安全生产的重要性日益凸显。对于各级政府和企事业单位&#xff0c;当务之急是如何高效地对突发事件进行执法管理。平台应运而生&#xff0c;旨在通过信息化、智能化技术&#xff0c;提升安全管理的效率与准确性。 一、平台特点 整合各类平台的信息资源&…

添加github SSH Key

添加github SSH Key 使用 SSH 协议&#xff0c;您可以连接远程服务器和服务并对其进行身份验证。使用 SSH 密钥&#xff0c;您可以连接到 GitHub&#xff0c;而无需在每次访问时提供您的用户名和个人访问令牌。您还可以使用 SSH 密钥来签署提交。 #3224333333qq.com替换为你自己…

6.NVIC中断配置(ST的精简ARM中断体系)

void NVIC_SetPriorityGrouping(uint32_t PriorityGroup)//设置优先级分组&#xff0c;整个项目共用一个分组 uint32_t NVIC_EncodePriority (uint32_t PriorityGroup, uint32_t PreemptPriority, uint32_t SubPriority) //计算优先级编码值&#xff0c;&#xff08;组号&…

Python爬虫--Ajax异步抓取腾讯视频评论

在某些网站 &#xff0c;当我们滑下去的时候才会显示出后面的内容 就像淘宝一样&#xff0c;滑下去才逐渐显示其他商品 这个就是采用 Ajax 做的 然后我们现在就是要编写这样的爬虫。 规律分析&#xff1a; 这个时候就要用到我们的 Fiddler 了 我们需要分析加载评论的规律 …

GateWay具体的使用之局部过滤器接口耗时

1.找规律 局部过滤器命名规则 XXXGatewayFilterFactory&#xff0c; 必须以GatewayFilterFactory结尾。 /* 注意名称约定 * AddRequestHeaderGatewayFilterFactory 配置的时候写的是 AddRequestHeader * AddRequestParameterGatewayFilterFactory 配置的时候写的是 A…

【语音识别】搭建本地的语音转文字系统:FunASR(离线不联网即可使用)

参考自&#xff1a; 参考配置&#xff1a;FunASR/runtime/docs/SDK_advanced_guide_offline_zh.md at main alibaba-damo-academy/FunASR (github.com)参考配置&#xff1a;FunASR/runtime/quick_start_zh.md at 861147c7308b91068ffa02724fdf74ee623a909e alibaba-damo-aca…

上位机图像处理和嵌入式模块部署(树莓派4b下使用sqlite3)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 嵌入式设备下面&#xff0c;有的时候也要对数据进行处理和保存。如果处理的数据不是很多&#xff0c;一般用json就可以。但是数据如果量比较大&…

CISSP通关学习笔记:共计 9 个章节(已完结)

1. 笔记说明 第 0 章节为开篇介绍&#xff0c;不包括知识点。第 1 - 8 章节为知识点梳理汇总&#xff0c;8 个章节的知识框架关系如下图所示&#xff1a; 2. 笔记目录 「 CISSP学习笔记 」0.开篇「 CISSP学习笔记 」1.安全与风险管理「 CISSP学习笔记 」2.资产安全「 CISSP…

Ps 滤镜:置换(工作原理篇)

执行“置换”滤镜时&#xff0c;目标图像会发生位移变形&#xff0c;而变形的程度及方向与一个称为“置换图”的 PSD 文件有密切关系。 总体而言&#xff0c;置换所产生的位移变形是基于置换图的通道中的灰度信息进行的。 一、当置换图是灰度模式文件时 在灰度模式下&#xff0…
最新文章