Spring Boot 整合 Reactor实例详解

2023-01-21 0 1,099

目录

引言

Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目

1 创建项目

使用 ;https://start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

Spring Boot 整合 Reactor实例详解

然后导入 Reactor 包

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

2 集成 H2 数据库

application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

server.port=8081
################ H2 数据库 基础配置 ##############
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.url=jdbc:h2:~/user
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database=h2
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.path=/h2-console
spring.h2.console.enable=true

3 创建测试

3.1 user 实体

建立简单数据操作实体 User。

import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:40
 */
@Data
@NoArgsConstructor
@Table(name = \"t_user\")
@Entity
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    private String userName;
    private int age;
    private String sex;
    public User(String userName, int age, String sex) {
        this.userName = userName;
        this.age = age;
        this.sex = sex;
    }
}

3.2 UserRepository

数据模型层使用 JPA 框架。

import com.prepared.user.domain.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}

3.3 UserService

service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

doOnError 监控异常情况;

doFinally 监控整体执行情况,如:耗时、调用量监控等。

import com.prepared.user.dao.UserRepository;
import com.prepared.user.domain.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.List;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Service
public class UserService {
    private Logger logger = LoggerFactory.getLogger(UserService.class);
    @Resource
    private UserRepository userRepository;
    public Mono<Boolean> save(User user) {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.save(user) != null;
                })
                .doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error(\"save.user.error, user={}, e\", user, e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info(\"save.user.time={}, user={}\", user, System.currentTimeMillis() - startTime);
                });
    }
    public Mono<User> findById(Long id) {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.getReferenceById(id);
                }).doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error(\"findById.user.error, id={}, e\", id, e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info(\"findById.user.time={}, id={}\", id, System.currentTimeMillis() - startTime);
                });
    }
    public Mono<List<User>> list() {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.findAll();
                }).doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error(\"list.user.error, e\", e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info(\"list.user.time={}, \", System.currentTimeMillis() - startTime);
                });
    }
  public Flux<User> listFlux() {
        long startTime = System.currentTimeMillis();
        return Flux.fromIterable(userRepository.findAll())
                .doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error(\"list.user.error, e\", e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info(\"list.user.time={}, \", System.currentTimeMillis() - startTime);
                });
    }
}

3.4 UserController

controller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

返回List可以使用Mono<List<User>> ,也可以使用 Flux<User>

  • Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素
  • Flux<T> 是一个标准的 Publisher<T>,表示为发出 0 到 N 个元素的异步序列
import com.prepared.user.domain.User;
import com.prepared.user.service.UserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:47
 */
@RestController
public class UserController {
    @Resource
    private UserService userService;
    @RequestMapping(\"/add\")
    public Mono<Boolean> add() {
        User user = new User(\"xiaoming\", 10, \"F\");
        return userService.save(user) ;
    }
    @RequestMapping(\"/list\")
    public Mono<List<User>> list() {
        return userService.list();
    }
}
    @RequestMapping(\"/listFlux\")
    public Flux<User> listFlux() {
        return userService.listFlux();
    }

3.5 SpringReactorApplication 添加注解支持

Application 启动类添加注解 @EnableJpaRepositories

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
 * Hello world!
 */
@SpringBootApplication
@EnableJpaRepositories
public class SpringReactorApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringReactorApplication.class, args);
    }
}

测试

启动项目,访问 localhost:8081/add,正常返回 true。

Spring Boot 整合 Reactor实例详解

查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

Spring Boot 整合 Reactor实例详解

后台日志:

2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181,

执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。

总结

响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

  • Future 的 get() 方法;
  • Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
  • 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度

以上就是Spring Boot 整合 Reactor实例详解的详细内容,更多关于Spring Boot 整合 Reactor的资料请关注其它相关文章!

:本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可, 转载请附上原文出处链接。
1、本站提供的源码不保证资源的完整性以及安全性,不附带任何技术服务!
2、本站提供的模板、软件工具等其他资源,均不包含技术服务,请大家谅解!
3、本站提供的资源仅供下载者参考学习,请勿用于任何商业用途,请24小时内删除!
4、如需商用,请购买正版,由于未及时购买正版发生的侵权行为,与本站无关。
5、本站部分资源存放于百度网盘或其他网盘中,请提前注册好百度网盘账号,下载安装百度网盘客户端或其他网盘客户端进行下载;
6、本站部分资源文件是经压缩后的,请下载后安装解压软件,推荐使用WinRAR和7-Zip解压软件。
7、如果本站提供的资源侵犯到了您的权益,请邮件联系: 442469558@qq.com 进行处理!

猪小侠源码-最新源码下载平台 Java教程 Spring Boot 整合 Reactor实例详解 http://www.20zxx.cn/463862/xuexijiaocheng/javajc.html

猪小侠源码,优质资源分享网

常见问题
  • 本站所有资源版权均属于原作者所有,均只能用于参考学习,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,建议提前注册好百度网盘账号,使用百度网盘客户端下载
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务