百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 博客教程 > 正文

使用 Kafka 和 Spring Boot 开发全栈 Java 应用程序

connygpt 2024-11-12 10:08 6 浏览

每日分享最新,最流行的软件开发知识与最新行业趋势,希望大家能够一键三连,多多支持,跪求关注,点赞,留言。


本教程展示了如何在 Spring Boot 应用程序中发布和订阅 Kafka 消息,以及如何在浏览器中实时显示消息。

你将构建什么
您将构建一个通过 Kafka 发送和接收消息的全栈响应式 Web 应用程序。该应用程序在服务器上使用 Spring Boot 和 Java,在客户端使用 Lit 和 TypeScript,以及用于组件和通信的 Hilla 框架。


带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。


你需要什么
20分钟
Java 11 或更新版本
节点 16.14 或更高版本
一个同时支持 Java 和 TypeScript 的 IDE,例如VS Code。
技术概述
卡夫卡
Apache Kafka 是一个分布式事件流平台。您可以将其视为类固醇上的发布/订阅系统。Kafka 生产者可以向主题发送消息,然后消费者可以读取这些消息。但是,与大多数 pub/sub 系统不同,当您阅读这些消息时,它们不会从主题中删除。这允许您执行流处理以实时分析、聚合或转换来自不同事件的数据。

如果您想了解 Kafka 的基础知识,我强烈建议您观看 Tim Berglund 的视频:



Spring Boot 和 Spring Kafka
Spring Boot 是一种使用 Spring 的固执己见的方式。它通过依赖于配置的约定将配置代码的数量减少到最低限度。此外,Spring Kafka 增加了对配置 Kafka 生产者和消费者以及通过注解方法监听传入消息的支持。

Hilla
Hilla 是一个为 Java 构建的前端框架。它结合了 Spring Boot 后端和 Lit 内置的响应式 TypeScript 前端。Hilla 会根据您的服务器端点签名自动生成 TypeScript 类型,这有助于在您开发应用程序时保持前端和后端同步。

下载并运行 Kafka
本教程使用本地 Kafka 代理。按照以下步骤在您的计算机上下载并启动 Kafka:

进入Kafka下载页面,下载Kafka。
提取下载的存档tar -xzf kafka<,version>.tgz
打开目录cd kafka_<version>
启动 Zookeeper 管理本地 Kafka 集群bin/zookeeper-server-start.sh config/zookeeper.properties
打开第二个终端并运行bin/kafka-server-start.sh config/server.properties以启动 Kafka 代理。
您现在已经运行了 Kafka,并准备开始构建您的应用程序。
创建一个新项目
首先创建一个新的 Hilla 项目。这将为您提供一个配置了 TypeScript-Lit 前端的 Spring Boot 项目。

使用 Vaadin CLI 初始化项目:npx @vaadin/cli init --hilla --empty hilla-kafka
在您选择的 IDE 中打开项目。
使用包含的 Maven 包装器启动应用程序。该命令将下载 Maven 和 npm 依赖项并启动开发服务器。注意:初始启动可能需要几分钟时间。然而,随后的启动几乎是瞬间的。./mvnw
添加 Kafka Spring 依赖项
pom.xml通过在文件中包含以下依赖项,向应用程序添加 Kafka 支持<dependencies>:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>


父 Spring Boot BOM 涵盖了依赖项,因此您无需显式添加版本号。

定义数据模型
首先创建一个新的 Java 包:com.example.application.model.

在这个新创建的包中,创建一个新的 Java 类 ,Message.java来表示您将通过 Kafka 发送的消息。然后,将以下内容添加到类中:

package com.example.application.model;


import java.time.Instant;
import dev.hilla.Nonnull;

public class Message {

private @Nonnull String text;
private Instant time;
private @Nonnull String userName;

public String getText() {
return text;
}

public void setText(String text) {
this.text = text;
}

public Instant getTime() {
return time;
}

public void setTime(Instant time) {
this.time = time;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

}


Hilla 框架使用@Nonnull注解来指导 TypeScript 类型生成:它们对 Java 行为没有影响。

使用 Kafka 发送自定义对象
在本教程中,您将发送 Java 对象作为消息,而不是使用像字符串或数字这样的原始方法。为此,您需要创建自定义序列化器和反序列化器。

在同一个包中,创建以下两个新类,MessageSerializer.java并MessageDeserializer.java具有以下内容:

package com.example.application.model;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;

public class MessageSerializer implements Serializer<Message> {

public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();

@Override
public byte[] serialize(String topic, Message message) {
try {
return mapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
}


package com.example.application.model;

import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;

public class MessageDeSerializer implements Deserializer<Message> {

public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();

@Override
public Message deserialize(String topic, byte[] data) {
try {
return mapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}


序列化器和反序列化器使用 Jackson 将对象与 JSON 进行转换。findAndAddModules()builder 方法允许 Jackson 通过您添加的依赖项支持 JSR310 数据类型。

配置Kafka
接下来,通过将以下内容添加到src/main/resources/application.properties文件中来配置 Kafka:

# A custom property to hold the name of our Kafka topic:
topic.name=chat

# Set up Kafka:
spring.kafka.bootstrap-servers=localhost:9092

# Configure the consumer:
spring.kafka.consumer.client-id=chat-consumer
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.application.model.MessageDeSerializer

# Configure the producer:
spring.kafka.producer.client-id=chat-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.application.model.MessageSerializer


更新Application.java以编程方式配置主题。

package com.example.application;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import com.vaadin.flow.component.page.AppShellConfigurator;
import com.vaadin.flow.server.PWA;
import com.vaadin.flow.theme.Theme;

/**
* The entry point of the Spring Boot application.
*
* Use the @PWA annotation to make the application installable on phones, tablets, and some desktop
* browsers.
*
*/

@SpringBootApplication
@Theme(value = "hilla-kafka")\
@PWA(name = "hilla-kafka", shortName = "hilla-kafka", offlineResources = {})
@Configuration

public class Application implements AppShellConfigurator {

@Value("${topic.name}")
private String topicName;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
NewTopic chatTopic() {

return TopicBuilder
.name(topicName)
.partitions(1)
.replicas(1)
.build();
}
}


以下是解释的基本部分:

通过 Spring 注入主题名称。
使用 TopicBuilder bean 配置来定义和配置新主题。在这个示例应用程序中,您只设置了一个分区和一个副本。在实际应用中,您会希望设置更多的分区和副本,以确保集群运行良好且可靠。
创建服务器端点
您现在已准备好开始使用 Kafka。接下来,创建将与 Kafka 代理和客户端 Web 应用程序通信的服务器端点。

MessageEndpoint.java,在包中创建一个新的 Java 文件com.example.application并将以下代码添加到其中:

package com.example.application;

import java.time.Instant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import com.example.application.model.Message;
import com.vaadin.flow.server.auth.AnonymousAllowed;
import dev.hilla.Endpoint;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;

@Endpoint
@AnonymousAllowed
public class MessageEndpoint {

@Value("${topic.name}")
private String topicName;

private final Many<Message> chatSink;
private final Flux<Message> chat;

private final KafkaTemplate<String, Message> kafkaTemplate;

MessageEndpoint(KafkaTemplate<String, Message> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
chatSink = Sinks.many().multicast().directBestEffort();
chat = chatSink.asFlux();
}

public Flux<Message> join() {
return chat;
}

public void send(Message message) {
message.setTime(Instant.now());
kafkaTemplate.send(topicName, message);
}

@KafkaListener(topics = "chat", groupId = "chat-group")
private void consumer(Message message) {
chatSink.emitNext(message,
(signalType, emitResult) -> emitResult == EmitResult.FAIL_NON_SERIALIZED);
}
}


以下是解释的基本部分:

@Endpoint 注释告诉 Hilla 将所有公共方法作为 TypeScript 方法提供给客户端。@AnonymousAllowed 关闭此端点的身份验证。
chatSink 是一种将数据传递给系统的编程方式。它发出消息,以便任何订阅了相关聊天 Flux 的客户端都会收到它们。
构造函数获取 Spring 注入的 KafkaTemplate 并将其保存到字段中。
join() 方法返回您将在客户端订阅的聊天 Flux。
send() 方法接收一条消息,用发送时间标记它,然后使用 kafkaTemplate 发送它。
consumer() 方法有一个 @KafkaListener 注释,它告诉 Spring Kafka 在传入消息上运行此方法。该方法将接收到的消息发送到 chatSink,它会通知所有订阅了聊天 Flux 的客户端。
启用反应式端点
在编写本教程 (1.2) 时,Hilla 的当前版本通过功能标志支持 Flux 端点方法。通过创建一个src/main/resources/vaadin-featureflags.properties,包含以下内容的新文件来启用该功能:

# Push support in Hilla
com.vaadin.experimental.hillaPush=true


创建用于发送和接收消息的视图
现在您已经配置了 Kafka 并设置了服务器以发送和接收消息,最后一步是创建一个可用于发送和接收消息的 Web 视图。

Hilla 包括 Vaadin 组件集,其中包含 40 多个组件。您可以使用<vaadin-message-list>和<vaadin-message-input>组件来构建主聊天 UI。您还可以使用该<vaadin-text-field>组件来捕获当前用户的姓名。

Hilla 使用 Lit 创建视图。Lit 在概念上类似于 React:组件由状态和模板组成。每当状态发生变化时,模板都会重新呈现。

首先重命名生成的占位符视图。frontend/views/empty/empty-view.ts将文件夹和文件重命名为frontend/views/messages/messages-view.ts. 用以下代码替换文件的内容:

import { View } from "Frontend/views/view";
import { customElement, state } from "lit/decorators.js";
import { html } from "lit";
import "@vaadin/message-list";
import "@vaadin/message-input";
import "@vaadin/text-field";
import { TextFieldChangeEvent } from "@vaadin/text-field";
import { MessageEndpoint } from "Frontend/generated/endpoints";
import Message from "Frontend/generated/com/example/application/model/Message";

@customElement("messages-view")
export class MessagesView extends View {
@state() messages: Message[] = [];
@state() userName = "";

render() {
return html`
<h1 class="m-m">Kafka message center</h1>
<vaadin-message-list
class="flex-grow"
.items=${this.messages}
></vaadin-message-list>
<div class="flex p-s gap-s items-baseline">
<vaadin-text-field
placeholder="Your name"
@change=${this.userNameChange}
></vaadin-text-field>
<vaadin-message-input
class="flex-grow"
@submit=${this.submit}
></vaadin-message-input>
</div>
`;
}

userNameChange(e: TextFieldChangeEvent) {
this.userName = e.target.value;
}

async submit(e: CustomEvent) {
MessageEndpoint.send({
text: e.detail.value,
userName: this.userName,
});
}

connectedCallback() {
super.connectedCallback();

this.classList.add("flex", "flex-col", "h-full", "box-border");

MessageEndpoint.join().onNext(
(message) => (this.messages = [...this.messages, message])
);
}
}


以下是解释的基本部分:

Lit 跟踪 @state() 修饰的属性,并且只要它们发生变化,模板就会重新渲染。
Message 数据类型由 Hilla 根据您在服务器上创建的 Java 对象生成。
消息列表通过 .items=${this.messages} 绑定到消息列表组件。items 前面的句点告诉 Lit 将数组作为属性而不是属性传递。
每当使用 @change=${this.userNameChange} 更改值时,文本字段都会调用 userNameChange 方法(@ 表示事件侦听器)。
消息输入组件在提交时调用 MessageEndpoint.send()。请注意,您正在调用 TypeScript 方法。Hilla 负责调用服务器上的底层 Java 方法。
最后,在 connectedCallback 中调用 MessageEndpoint.join() 开始接收传入的聊天消息。
除了 Vaadin 组件之外,您还使用Hilla CSS 实用程序类进行基本布局(flex、flex-grow、flex-col)。
最后,更新路由以匹配视图的新名称。将 的内容替换为routes.ts以下内容:

import { Route } from "@vaadin/router";
import "./views/messages/messages-view";

export const routes: Route[] = [{ path: "", component: "messages-view" }];


运行已完成的应用程序
如果您的应用程序仍在运行,请重新启动它。服务器启动后,您可以通过 http://localhost:8080 访问应用程序。尝试在多个浏览器中打开应用程序,以查看所有浏览器实时显示的消息。

./mvnw




带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。

相关推荐

3分钟让你的项目支持AI问答模块,完全开源!

hello,大家好,我是徐小夕。之前和大家分享了很多可视化,零代码和前端工程化的最佳实践,今天继续分享一下最近开源的Next-Admin的最新更新。最近对这个项目做了一些优化,并集成了大家比较关注...

干货|程序员的副业挂,12个平台分享

1、D2adminD2Admin是一个完全开源免费的企业中后台产品前端集成方案,使用最新的前端技术栈,小于60kb的本地首屏js加载,已经做好大部分项目前期准备工作,并且带有大量示例代码,助...

Github标星超200K,这10个可视化面板你知道几个

在Github上有很多开源免费的后台控制面板可以选择,但是哪些才是最好、最受欢迎的可视化控制面板呢?今天就和大家推荐Github上10个好看又流行的可视化面板:1.AdminLTEAdminLTE是...

开箱即用的炫酷中后台前端开源框架第二篇

#头条创作挑战赛#1、SoybeanAdmin(1)介绍:SoybeanAdmin是一个基于Vue3、Vite3、TypeScript、NaiveUI、Pinia和UnoCSS的清新优...

搭建React+AntDeign的开发环境和框架

搭建React+AntDeign的开发环境和框架随着前端技术的不断发展,React和AntDesign已经成为越来越多Web应用程序的首选开发框架。React是一个用于构建用户界面的JavaScrip...

基于.NET 5实现的开源通用权限管理平台

??大家好,我是为广大程序员兄弟操碎了心的小编,每天推荐一个小工具/源码,装满你的收藏夹,每天分享一个小技巧,让你轻松节省开发效率,实现不加班不熬夜不掉头发,是我的目标!??今天小编推荐一款基于.NE...

StreamPark - 大数据流计算引擎

使用Docker完成StreamPark的部署??1.基于h2和docker-compose进行StreamPark部署wgethttps://raw.githubusercontent.com/a...

教你使用UmiJS框架开发React

1、什么是Umi.js?umi,中文可发音为乌米,是一个可插拔的企业级react应用框架。你可以将它简单地理解为一个专注性能的类next.js前端框架,并通过约定、自动生成和解析代码等方式来辅助...

简单在线流程图工具在用例设计中的运用

敏捷模式下,测试团队的用例逐渐简化以适应快速的发版节奏,大家很早就开始运用思维导图工具比如xmind来编写测试方法、测试点。如今不少已经不少利用开源的思维导图组件(如百度脑图...)来构建测试测试...

【开源分享】神奇的大数据实时平台框架,让Flink&amp;Spark开发更简单

这是一个神奇的框架,让Flink|Spark开发更简单,一站式大数据实时平台!他就是StreamX!什么是StreamX大数据技术如今发展的如火如荼,已经呈现百花齐放欣欣向荣的景象,实时处理流域...

聊聊规则引擎的调研及实现全过程

摘要本期主要以规则引擎业务实现为例,陈述在陌生业务前如何进行业务深入、调研、技术选型、设计及实现全过程分析,如果你对规则引擎不感冒、也可以从中了解一些抽象实现过程。诉求从硬件采集到的数据提供的形式多种...

【开源推荐】Diboot 2.0.5 发布,自动化开发助理

一、前言Diboot2.0.5版本已于近日发布,在此次发布中,我们新增了file-starter组件,完善了iam-starter组件,对core核心进行了相关优化,让devtools也支持对IAM...

微软推出Copilot Actions,使用人工智能自动执行重复性任务

IT之家11月19日消息,微软在今天举办的Ignite大会上宣布了一系列新功能,旨在进一步提升Microsoft365Copilot的智能化水平。其中最引人注目的是Copilot...

Electron 使用Selenium和WebDriver

本节我们来学习如何在Electron下使用Selenium和WebDriver。SeleniumSelenium是ThoughtWorks提供的一个强大的基于浏览器的开源自动化测试工具...

Quick &#39;n Easy Web Builder 11.1.0设计和构建功能齐全的网页的工具

一个实用而有效的应用程序,能够让您轻松构建、创建和设计个人的HTML网站。Quick'nEasyWebBuilder是一款全面且轻巧的软件,为用户提供了一种简单的方式来创建、编辑...