有些项目须要大量时间才能运行特定功能:电子商务场景或系统,当付款提供商确认付款过程后,该系统须要发送电子邮件。做为开发人员,咱们知道让用户等待是不可行的。html
在付款的状况下,应用程序须要在付款完成后发送电子邮件。建立要异步执行的任务队列是一种处理大量数据而又不影响用户并使用户满意的绝佳方法。这篇文章的目的是讨论如何使用Spring和RabbitMQ使用Java建立这些异步调用。java
RabbitMQ是一种开源消息代理软件,它将消息从发送者的正式消息传递协议转换为接收者的正式消息传递协议。换句话说,RabbitMQ是生产者-消费者实现,生产者处理消息,而消费者是运行该过程的客户。mysql
为了展现RabbitMQ的工做原理,咱们将建立一个平滑的示例来管理具备如下三种状态的汽车:新车什么时候,该车什么时候售出以及该车被识别为垃圾车。咱们但愿将其存储在关系数据库中,并有两个表:一个用于放置当前汽车状态,第二个用于存储有关汽车的历史信息。所以,对于每一个新事件,咱们都会向RabbitMQ触发一个事件,以便异步地对新客户端执行该事件。git
该项目演示将是一个Maven项目。所以,第一步是定义项目相关性,例如 将Spring Boot,Spring Data,MySQL驱动程序和RabbitMQ客户端插入pom.xml文件。web
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sh.platform.start</groupId>
<artifactId>spring-boot-jms</artifactId>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq.jms</groupId>
<artifactId>rabbitmq-jms</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>sh.platform</groupId>
<artifactId>config</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<build>
<finalName>spring-boot-jms</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>oss.sonatype.org-snapshot</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
</project>
下一步是配置类 这些类负责提供数据源以链接到数据库,并提供链接工厂供客户端用来与JMS提供程序建立链接。spring
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import sh.platform.config.Config;
import sh.platform.config.MySQL;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Bean(name = "dataSource")
public DataSource getDataSource() {
Config config = new Config();
MySQL database = config.getCredential("database", MySQL::new);
return database.get();
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import sh.platform.config.Config;
import sh.platform.config.RabbitMQ;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class JMSConfig {
private ConnectionFactory getConnectionFactory() {
Config config = new Config();
final RabbitMQ rabbitMQ = config.getCredential("rabbitmq", RabbitMQ::new);
return rabbitMQ.get();
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
ConnectionFactory connectionFactory = getConnectionFactory();
return new CachingConnectionFactory(connectionFactory);
}
}
建立配置后,下一步是定义实体。这些实体是业务的核心,将表明咱们将从数据库建立/写入并集成到队列中的实例。在此示例中,有两个实体:Car实体(其中咱们拥有汽车的当前状态)和保存操做状态的实体CarLog
。sql
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Objects;
@Entity
public class Car {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private String plate;
@Column
private String model;
@Column
private Integer age;
@Column
private String color;
public Long getId() {
return id;
}
public String getModel() {
return model;
}
public Integer getAge() {
return age;
}
public String getColor() {
return color;
}
public String getPlate() {
return plate;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Car car = (Car) o;
return Objects.equals(id, car.id);
}
@Override
public int hashCode() {
return Objects.hashCode(id);
}
@Override
public String toString() {
return "Car{" +
"id=" + id +
", plate='" + plate + '\'' +
", model='" + model + '\'' +
", age=" + age +
", color='" + color + '\'' +
'}';
}
}
public enum CarStatus {
NEW, JUNK, SOLD;
}
import javax.persistence.*;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Objects;
@Entity
public class CarLog {
private static final ZoneId UTC = ZoneId.of("UTC");
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private String plate;
@Column
private String model;
@Column
private LocalDateTime date = LocalDateTime.now(UTC);
@Column
@Enumerated(value = EnumType.STRING)
private CarStatus status;
public Long getId() {
return id;
}
public String getPlate() {
return plate;
}
public String getModel() {
return model;
}
public CarStatus getStatus() {
return status;
}
public LocalDateTime getDate() {
return date;
}
public static CarLog newCar(Car car) {
return of(car, CarStatus.NEW);
}
public static CarLog junk(Car car) {
return of(car, CarStatus.JUNK);
}
public static CarLog sold(Car car) {
return of(car, CarStatus.SOLD);
}
private static CarLog of(Car car, CarStatus status) {
Objects.requireNonNull(car, "car is required");
CarLog log = new CarLog();
log.plate = car.getPlate();
log.model = car.getModel();
log.status = status;
return log;
}
}
以后,一旦定义了Spring Data实体,下一步就是建立存储库的接口。Spring数据存储库抽象的目标是显着减小实现各类持久性存储的数据访问层所需的样板代码量。数据库
import org.springframework.data.repository.PagingAndSortingRepository;
public interface CarRepository extends PagingAndSortingRepository<Car, Long> {
}
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface CarLogRepository extends PagingAndSortingRepository<CarLog, Long> {
List<CarLog> findByPlate(String plate);
List<CarLog> findByModel(String model);
List<CarLog> findByStatus(CarStatus status);
}
在MVC模式中,控制器是模型和视图之间的层,这就是咱们接下来要建立的控制器类。在CarController层中,有一个JmsTemplate,能够很是轻松地将消息发送到JMS目标。apache
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("cars")
public class CarController {
@Autowired
private CarRepository repository;
@Autowired
private JmsTemplate template;
@PostMapping
@ResponseStatus(code = HttpStatus.CREATED)
public String save(@RequestBody Car car) {
repository.save(car);
template.convertAndSend("new", car);
return "Saved- " + car.getModel();
}
@GetMapping(value = "/{id}", produces = "application/json")
public Car get(@PathVariable("id") long id) {
return repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
}
@GetMapping(produces = "application/json")
public Iterable<Car> get() {
return repository.findAll();
}
@PutMapping(value = "/{id}", produces = "application/json")
public Car update(@PathVariable("id") long id, @RequestBody Car car) {
repository.save(car);
return car;
}
@DeleteMapping(value = "junk/{id}", produces = "application/json")
public Car junk(@PathVariable("id") long id) {
Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
repository.deleteById(id);
template.convertAndSend("junk", car);
return car;
}
@DeleteMapping(value = "sold/{id}", produces = "application/json")
public Car sold(@PathVariable("id") long id) {
Car car = repository.findById(id).orElseThrow(() -> new RuntimeException("Not found"));
repository.deleteById(id);
template.convertAndSend("sold", car);
return car;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("logs")
public class CarLogController {
@Autowired
private CarLogRepository repository;
@GetMapping(produces = "application/json")
public Iterable<CarLog> get() {
return repository.findAll();
}
@GetMapping(value = "{plate}", produces = "application/json")
public Iterable<CarLog> getHistoric(@PathVariable("plate") String plate) {
return repository.findByPlate(plate);
}
@GetMapping(value = "models/{model}", produces = "application/json")
public Iterable<CarLog> get(@PathVariable("model") String model) {
return repository.findByModel(model);
}
@GetMapping(value = "status/{status}", produces = "application/json")
public Iterable<CarLog> get(@PathVariable("status") CarStatus status) {
return repository.findByStatus(status);
}
}
在CarLogController
层中,咱们仅看到GET动词,这意味着它是只读控制器。可是信息将如何进入数据库?在CarController
层中,客户端将消息发送到RabbitMQ队列。json
接下来,是时候讨论将读取此队列的类了。CarEventReceiver
类为JmsListener
批注提供了几种方法,其属性表示该方法的队列。 它会侦听并等待消息读取和处理。若是再看一下如何在CarController
层类中使用JmsTemplate
,则第一个参数是一个String
,它提供将信息发送到的队列名称做为第二个参数。Spring JMS经过容许生产和注释的模板轻松地链接消费者和生产者,从而使信息更易使用。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CarEventReceiver {
@Autowired
private CarLogRepository repository;
@JmsListener(destination = "new")
public void newCar(Car car) {
CarLog log = CarLog.newCar(car);
repository.save(log);
}
@JmsListener(destination = "junk")
public void junk(Car car) {
CarLog log = CarLog.junk(car);
repository.save(log);
}
@JmsListener(destination = "sold")
public void sold(Car car) {
CarLog log = CarLog.sold(car);
repository.save(log);
}
}
Java应用程序已准备就绪!下一步是设置管理和部署应用程序所需的Platform.sh的文件。在咱们的第一篇Java文章中,咱们深刻研究了这三个文件的每一个细节:
一台路由器(.platform/routes.yaml). Platform.sh容许你定义路由。
零个或多个服务容器(.platform/services.yaml). Platform.sh容许你彻底定义和配置要在项目上使用的拓扑和服务。
一个或多个应用程序容器(.platform.app.yaml)。 你能够经过一个配置文件控制应用程序以及在Platform.sh上构建和部署应用程序的方式。
在这篇文章中将要更改的文件是服务文件,使你能够定义数据库,搜索引擎,缓存等。在此项目中,咱们将设置MariaDB和RabbitMQ。
db:
type: mariadb:10.4
disk: 512
queuerabbit:
type: rabbitmq:3.7
disk: 512
在应用程序文件中,咱们将更改关系以容许咱们的应用程序访问服务。要指出的是,从安全角度来看,这种访问是一项基本功能。所以,在微服务场景中,咱们能够确保金融应用程序访问金融服务等等。
# This file describes an application. You can have multiple applications
# in the same project.
#
# See https://docs.platform.sh/user_guide/reference/platform-app-yaml.html
# The name of this app. Must be unique within a project.
name: app
# The runtime the application uses.
type: "java:8"
disk: 1024
# The hooks executed at various points in the lifecycle of the application.
hooks:
build: mvn clean install
# The relationships of the application with services or other applications.
#
# The left-hand side is the name of the relationship as it will be exposed
# to the application in the PLATFORM_RELATIONSHIPS variable. The right-hand
# side is in the form `<service name>:<endpoint name>`.
relationships:
database: "db:mysql"
rabbitmq: "queuerabbit:rabbitmq"
# The configuration of app when it is exposed to the web.
web:
commands:
start: java -jar -Xmx512m target/spring-boot-jms.jar --server.port=$PORT
如今,该应用程序已经准备就绪,如今能够经过如下步骤使用Platform.sh将其移至云中:
建立一个新的免费试用账户。
使用新的用户名和密码注册,或使用当前的GitHub,Bitbucket或Google账户登陆。若是你使用第三方登陆,则之后能够为Platform.sh账户设置密码。
选择你的网站应居住的世界区域。
选择空白模板。
使用该向导后,Platform.sh将为你提供整个基础结构,并为你的项目提供一个远程Git存储库。Platform.sh Git驱动的基础结构意味着它将自动管理你的应用程序将其推送到主远程存储库所需的一切。设置SSH密钥后,只需编写代码(包括一些用于指定所需基础结构的YAML文件),而后将其提交到Git并推送便可。
git remote add platform <platform.sh@gitrepository>
git commit -m "Initial project"
git push -u platform master
推送的代码将建立Java应用程序和服务实例,并在完成后将IP地址返回给服务。让咱们测试一下该应用程序。
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":1,"plate":"AB-0001-AB","model":"Vogel","age":2012,"color":"green"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":2,"plate":"AB-0003-AB","model":"Renault","age":2018,"color":"red"}'
curl -X POST -k -H 'Content-Type: application/json' -i 'https://<host_addresss>/cars' --data '{"id":3,"plate":"AB-0006-AB","model":"Peugeot","age":2019"color":"black"}'
curl -X GET -i 'https://<host_address>/logs'
在本文中,咱们学习了如何使用RabbitMQ和Spring优化整个系统的异步通讯。这种策略将使你的应用程序具备更大的可伸缩性,并防止用户等待过久才能得到队列/主题使用者的答案。具备任何异步通讯的体系结构容许(例如)第二个应用程序从代理读取和处理信息,或者在系统须要的状况下拥有多个消费者。
感谢阅读!
另外近期整理了一套完整的【java架构思惟导图】,分享给一样正在认真学习的每位朋友~还有更多JVM、Mysql、Tomcat、Spring Boot、Spring Cloud、Zookeeper、Kafka、RabbitMQ、RockerMQ、Redis、ELK、Git等Java干货,欢迎私信交流!