一、前言
在总结Spring Cloud Bus消息总线的时候,需要用到RabbitMQ消息中间件,由于之前对MQ还不是很熟悉,所以花了一点时间研究了一下RabbitMQ。
二、简介
RabbitMQ 是一个消息中间件,以异步的方式处理消息,实现了与业务之间的解耦,同时还有消息分发,消息缓存等功能。RabbitMQ使用的是AMQP协议,全名是提供统一消息服务的应用层标准高级消息队列协议。默认启动端口 5672。
在RabbitMQ中主要的概念有:交换机,队列,绑定,路由键等。交换机主要有:
Directed Exchange: 路由键(route-key)方式分发消息
Topic Exchange: 通配符方式分发消息
Fanout Exchange:扇形交换机,这种交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe(发布/订阅)模式。用来做广播最好。所有该exchagne上指定的routing-key都会被ignore掉。(原因:因为route-key无效)
Header Exchange: 设置header attribute参数类型的交换机(用的不多,常见的是前面三种)
本文主要讲解RabbitMQ Directed Exchange路由键交换机的使用方法,至于RabbitMQ Server以及ErLang的安装可以去百度,直接exe安装即可。我们可以通过web管理界面来管理RabbitMQ的connnection、channels、exchanges、users等信息。
工作原理图:
消息发布者发送消息到交换机,交换机通过route key找到相对应的队列,监听了该队列的消息接收者都能接收该消息。RabbitMQ大概流程:
1. 消息发送者发送一条消息给交换机 >>>> 2. 交换机根据关键字匹配到对应的队列 >>>> 3. 将消息存入队列 >>>> 4. 消息接收者从队列中取出消息使用
三、准备工程
下面我们先讲解一下一个消息发送者对应一个消息接收者(一对一)
springboot_rabbitmq_direct_exchange: 端口1111
注意引入spring-boot-starter-amqp依赖,具体的pom.xml文件如下:
-
“1.0” encoding=“UTF-8” xml version=
-
<project xmlns=“http://maven.apache.org/POM/4.0.0” xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”
-
xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
-
<modelVersion>4.0.0</modelVersion>
-
-
<groupId>com.springboot.wsh</groupId>
-
<artifactId>springboot_rabbitmq_direct_exchange</artifactId>
-
<version>0.0.1-SNAPSHOT</version>
-
<packaging>jar</packaging>
-
-
<name>springboot_rabbitmq_direct_exchange</name>
-
<description>DirectExchange路由键方式</description>
-
-
<parent>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-parent</artifactId>
-
<version>1.5.2.RELEASE</version>
-
<relativePath/> <!– lookup parent from repository –>
-
</parent>
-
-
<properties>
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
<java.version>1.8</java.version>
-
</properties>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>com.alibaba</groupId>
-
<artifactId>fastjson</artifactId>
-
<version>1.2.40</version>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-test</artifactId>
-
<scope>test</scope>
-
</dependency>
-
</dependencies>
-
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-maven-plugin</artifactId>
-
</plugin>
-
</plugins>
-
</build>
-
-
-
</project>
四、application.yml中配置RabbitMQ
-
-
spring:
-
rabbitmq:
-
-
port: 5672
-
-
password: guest
-
-
username: guest
-
-
host: localhost
-
-
virtual-host: /
-
-
publisher-confirms: true
-
server:
-
port: 1111
五、新建SaveUserQueueConfiguration配置类
我们新建一个常量类用于保存queue、exchange、route-key标识的类Constants.java:
-
public class Constants {
-
-
//保存用户-交换机名称
-
public static final String SAVE_USER_EXCHANGE_NAME = “user.save.direct.exchange.name”;
-
//保存用户-队列名称
-
public static final String SAVE_USER_QUEUE_NAME = “user.save.queue.name”;
-
//保存用户-队列路由键
-
public static final String SAVE_USER_QUEUE_ROUTE_KEY = “user.save.queue.route.key”;
-
-
}
在实际项目中,我们需要根据某个业务分别定义RabbitMQ配置类,主要配置queue队列、exchange交换机、binding绑定。这里模拟用户新增成功之后发送消息操作。(只是模拟,没有实际操作数据库)
-
/**
-
* @Title: SaveUserQueueConfiguration
-
* @ProjectName springboot_rabbit_mq
-
* @Description: 保存用户RabbitMQ相关配置类
-
* @Author WeiShiHuai
-
* @Date 2018/9/20 14:41
-
*/
-
-
public class SaveUserQueueConfiguration {
-
-
/**
-
* 配置交换机实例
-
*
-
* @return
-
*/
-
-
public DirectExchange directExchange() {
-
return new DirectExchange(Constants.SAVE_USER_EXCHANGE_NAME);
-
}
-
-
/**
-
* 配置队列实例,并且设置持久化队列
-
*
-
* @return
-
*/
-
-
public Queue queue() {
-
return new Queue(Constants.SAVE_USER_QUEUE_NAME, true);
-
}
-
-
/**
-
* 将队列绑定到交换机上,并设置消息分发的路由键
-
*
-
* @return
-
*/
-
-
public Binding binding() {
-
//链式写法: 用指定的路由键将队列绑定到交换机
-
return BindingBuilder.bind(queue()).to(directExchange()).with(Constants.SAVE_USER_QUEUE_ROUTE_KEY);
-
}
-
-
}
该配置类大致分为三个步骤:
a. DirectExchange交换机,为交换机设置一个名称
b. Queue队列,为消息队列设置一个名称,注意消息接收者监听的队列名称必须与消息发送者注册的队列名称一致,否则消息不能分发成功到消息接收者。
c. Binding绑定,消息绑定的目的就是将Queue实例绑定到Exchange上,并且通过设置的路由Key进行消息转发,配置了路由Key后,只有符合该路由配置的消息才会被转发到绑定交换上的消息队列。
六、定义发送消息统一业务类
定义一个名为SendMessageService 的接口,这个接口继承了RabbitTemplate.ConfirmCallback,ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知。
-
/**
-
* @Title: SendMessageService
-
* @ProjectName springboot_rabbit_mq
-
* @Description: 发送消息统一业务层
-
* @Author WeiShiHuai
-
* @Date 2018/9/20 14:53
-
* <p>
-
* 说明: 继承RabbitTemplate.ConfirmCallback接口,而ConfirmCallback接口是用来回调消息发送成功后的方法,
-
* 当一个消息被成功写入到RabbitMQ服务端时,会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知
-
*/
-
public interface SendMessageService extends RabbitTemplate.ConfirmCallback {
-
-
/**
-
* 发送消息方法
-
*
-
* @param message 发送内容
-
*/
-
void sendMessage(Object message);
-
-
}
定义实现类SendMessageServiceImpl.java:
该类注入了RabbitTemplate,RabbitTemplate封装了发送消息的方法,我们直接使用即可。可以看到我们构建了一个回调返回的数据,并使用convertAndSend方法发送了消息。同时实现了confirm回调方法,通过判断isSendSuccess可以知道消息是否发送成功,这样我们就可以进行进一步处理。
-
/**
-
* @Title: SendMessageServiceImpl
-
* @ProjectName springboot_rabbit_mq
-
* @Description: 发送消息业务层实现类
-
* @Author WeiShiHuai
-
* @Date 2018/9/20 14:59
-
*/
-
-
public class SendMessageServiceImpl implements SendMessageService {
-
-
private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);
-
-
-
private RabbitTemplate rabbitTemplate;
-
-
-
public void sendMessage(Object message) {
-
//设置回调对象
-
rabbitTemplate.setConfirmCallback(this);
-
//构建回调返回的数据
-
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
-
rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
-
logger.info(“SendMessageServiceImpl() >>> 发送消息到RabbitMQ, 消息内容: “ + message);
-
}
-
-
/**
-
* 消息回调确认方法
-
*
-
* @param correlationData 回调数据
-
* @param isSendSuccess 是否发送成功
-
* @param
-
*/
-
-
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
-
logger.info(“confirm回调方法>>>>>>>>>>>>>回调消息ID为: “ + correlationData.getId());
-
if (isSendSuccess) {
-
logger.info(“confirm回调方法>>>>>>>>>>>>>消息发送成功”);
-
} else {
-
logger.info(“confirm回调方法>>>>>>>>>>>>>消息发送失败” + s);
-
}
-
-
}
-
}
接着编写发送消息的类:注入SendMessageService 我们自定义的统一发送消息业务类。
-
/**
-
* @Title: UserService
-
* @ProjectName springboot_rabbit_mq
-
* @Description: 用户业务层接口
-
* @Author WeiShiHuai
-
* @Date 2018/9/20 14:25
-
*/
-
-
public class UserService {
-
-
-
private SendMessageService sendMessageService;
-
-
-
public Long saveUser(UserEntity userEntity) {
-
//保存用户操作
-
//这里写保存数据库操作…
-
-
//发送消息到RabbitMQ
-
sendMessageService.sendMessage(userEntity.getName());
-
return userEntity.getId();
-
}
-
-
}
这里模拟了保存用户信息之后发送一条消息。以下是一些比较简单的类:
UserEntity.java:
-
public class UserEntity implements Serializable {
-
-
private Long id;
-
/**
-
* 姓名
-
*/
-
private String name;
-
/**
-
* 年龄
-
*/
-
private int age;
-
-
public Long getId() {
-
return id;
-
}
-
-
public void setId(Long id) {
-
this.id = id;
-
}
-
-
public String getName() {
-
return name;
-
}
-
-
public void setName(String name) {
-
this.name = name;
-
}
-
-
public int getAge() {
-
return age;
-
}
-
-
public void setAge(int age) {
-
this.age = age;
-
}
-
-
}
用于测试的UserController :
-
-
public class UserController {
-
-
-
private UserService userService;
-
-
“/sendMessage”)(
-
public void sendMessage() {
-
UserEntity userEntity = new UserEntity();
-
userEntity.setAge(20);
-
userEntity.setName(“zhangsan”);
-
userService.saveUser(userEntity);
-
}
-
-
}
七、定义消息接收者
-
/**
-
* @Title: ReceiveMessage
-
* @ProjectName springboot_rabbit_mq
-
* @Description: 接收消息
-
* @Author WeiShiHuai
-
* @Date 2018/9/20 15:25
-
*/
-
-
“user.save.queue.name”)(queues =
-
public class ReceiveMessage {
-
private static Logger logger = LoggerFactory.getLogger(ReceiveMessage.class);
-
-
-
public void receiveMessage(String userName) {
-
logger.info(“消息接收成功,用户名为:” + userName);
-
//可以添加自定义业务逻辑处理
-
}
-
-
}
注意:
a. @RabbitListener注解
RabbitMQ队列消息监听注解,该注解配置监听queues
内的队列名称列表,可以配置多个。队列名称对应Constants常量中的对应的名称user.save.queue.name。
b. @RabbitHandler注解
RabbitMQ消息处理方法,该方法的参数需要消息发送者发送的消息的类型保持一致,否则无法自动调用消费方法,也就无法完成消息的分发。
这样我们就监听了user.save.queue.name这个队列上的消息,只要我们往Direct Exchange交换机发送满足route key的消息,该消息就会由交换机分发到我们接收者。
八、启动项目
该部分启动日志就是我们配置的RabbitMQ
初始化信息,我们可以看到项目启动时会自动与配置的RabbitMQ
进行关联。
浏览器访问http://localhost:1111/sendMessage,如下图:
从打印的日志可以看到,我们成功发送了一条消息到RabbitMQ,监听了该队列的接收者成功接收到发送的数据,并且成功执行了发送消息成功回调方法。
上面示例只是一个消息发送者对应一个消息接收者(一对一),以下我们改造一下代码,实现一个消息发送者对应多个消息接收者(一对多)。
九、RabbitMQ实现一对多消息消费
一对多,就是消息发送者发送消息,对应有多个消息接收者监听。
步骤:
【a】新增一个消息接收者,注意监听的队列名称需要一样,这里复制ReceiveMessage取名为ReceiveMessage2,内容与ReceiveMessage相同,只是加了日志输出区别哪一个消息接收者。
-
-
“user.save.queue.name”)(queues =
-
public class ReceiveMessage2 {
-
private static Logger logger = LoggerFactory.getLogger(ReceiveMessage2.class);
-
-
-
public void receiveMessage(String userName) {
-
logger.info(“【消息接收者2】消息接收成功,用户名为:” + userName);
-
//可以添加自定义业务逻辑处理
-
}
-
-
}
【b】在UserController中新增一个方法,用于测试一对多
-
“/sendMessage/oneToMany”)(
-
public void sendMessageOneToMany() {
-
//循环十次,消费者发送十条消息到RabbitMQ
-
for (int i = 1; i <= 10; i++) {
-
UserEntity userEntity = new UserEntity();
-
userEntity.setAge(20);
-
userEntity.setName(“zhangsan”.concat(String.valueOf(i)));
-
userService.saveUser(userEntity);
-
}
-
}
这里我们使用for循环模拟发送十条消息,我们启动项目,浏览器访问http://localhost:1111/sendMessage/oneToMany,
由上图可见,
消息接收者1成功的接受了4条对应消息内容,不过具体接受的条数并不是固定的,由RabbitMQ
消息转发权重内部问题。消息接收者2获得了6条消息,不过也没有什么规律,编号也没有什么顺序。多测试几次,会发现每个消息接收者接收的消息条数并不是固定不变的。
以上就是一对多消息消费的示例。
九、总结
RabbitMQ不能进行批量处理处理,消息发送者发送的消息只能一条一条存入队列,消息接收者根据匹配的规则从队列中一条一条的取出。每次消息接收者取出消息时会通知队列,我获取到了发送者发送的消息,当队列接收到这条消息,就会把消息删除(默认的ACK机制)。如果在接收消息之后,消费者挂掉,或者任何情况没有返回ack,队列中这条消息将不会删除,可以一直存着,等待其他消费者来取。注意,但是如果设置不返回ack,消息发送者一直不间断发送消息到RabbitMQ,会导致RabbitMQMQ仓库炸了。
——————— 本文来自 weixiaohuai 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/weixiaohuai/article/details/82790785?utm_source=copy