
RabbitMQ
一、基于Queue的消息发送与接收
1、引入 Spring AMQP 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2、消息发送:
rabbitTemplate.convertAndSend(String routingKey, final Object object)
// routingKey 表示 RabbitMQ 中的 queues;object 表示待发内容
rabbitTemplate.convertAndSend("simple.queue", "消费者,你好!");
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("work.queue", "你好,work.queue!");
Thread.sleep(20);
}3、消息接收:
package tech.chstack.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MqListener {
@RabbitListener(queues = "simple.queue")
public void anyMethodName(String msg) {
System.out.println("simple.queue收到消息:" + msg);
}
@RabbitListener(queues = "work.queue")
public void onWorkQueue1(String msg) throws InterruptedException {
System.out.println("【消费者1】收到 work.queue 消息:" + msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void onWorkQueue2(String msg) throws InterruptedException {
System.err.println("【消费者2】收到 work.queue 消息:" + msg);
Thread.sleep(200);
}
}Work Queue 任务模型,即多个消费者绑定到一个队列,共同消费队列中的消息。从代码中可以看出,只要写多个
@RabbitListener(queues = "work.queue")即可。

注意:多个消费者默认采用轮询的方式,来消费信息。对于有“信息处理能力不同的消费者”存在的情况,这种轮询显然是不合理的。
4、能者多劳配置:
# 修改consumer服务配置如下:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息prefetch: 1 表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询。
如何解决消息堆积问题?
(1) 采用 work queue 模型,在队列上绑定多个消费者,提高消息处理的能力。
(2) 优化代码:缓存、池化、异步(Java代码中的异步,而非MQ套娃)
(3) 通过设置 prefetch 为 1,实现能者多劳,降低消费总时间(重要!)
二、通过交换机通信
多个队列的原因是有多个微服务(业务), 多个消费者的原因是加速业务处理
routingKey | 生产者 发送消息时指定的“标签”,告诉交换器“这条消息属于什么类别”。 |
bindingKey | 消费者 绑定队列时设置的“过滤规则”,告诉交换器“哪些队列需要接收这个类别的消息”。 |
1、Fanout交换机

Fanout Exchange 会将收到的消息,广播到每个跟其绑定的queue,所以也叫广播模式
准备工作:rabbitmq控制台创建 huan.fanout 交换机,绑定到 fanout.queue1 和 fanout.queue2
生产者:
// 发送消息
public void convertAndSend(String exchange, String routingKey, final Object object)
// 示例:因为是Fanout是群发,所以不需要绑定routingKey
rabbitTemplate.convertAndSend("huan.fanout", null, "hello everyone!");消费者:
@RabbitListener(queues = "fanout.queue1")
public void onFanout1(String msg) {
System.out.println("【消费者1】收到 fanout.queue1 消息:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void onFanout2(String msg) {
System.out.println("【消费者2】收到 fanout.queue2 消息:" + msg);
}运行结果:
【消费者2】收到 fanout.queue2 消息:hello everyone!
【消费者1】收到 fanout.queue1 消息:hello everyone!2、Direct交换机

Direct Exchange 会将收到的消息,根据路由规则,路由到指定的queue,因此也称为定向路由
(1) 每个 Queue 都与 Exchange 设置一个 BindingKey
(2) 发布者发送消息时,指定消息的 RoutingKey
(3) Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列
准备工作:rabbitmq控制台创建 huan.direct 交换机,绑定到 direct.queue1 和 direct.queue2,以及下表的绑定关系:
| Queue | RoutingKey |
direct.queue1 | red、blue |
direct.queue2 | red、yellow |
生产者:
@Test
void testDirect1() {
String exchangeName = "huan.direct";
String routingKey = "red";
String msg = "红色警报,由于日本排放核污水,惊现哥斯拉!";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testDirect2() {
String exchangeName = "huan.direct";
String routingKey = "blue";
String msg = "蓝色通知,警报解除,哥斯拉是假的!";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}消费者:
@RabbitListener(queues = "direct.queue1")
public void onDirect1(String msg) {
System.out.println("【消费者1】收到 direct.queue1 消息:" + msg);
}
@RabbitListener(queues = "direct.queue2")
public void onDirect2(String msg) {
System.out.println("【消费者2】收到 direct.queue2 消息:" + msg);
}运行结果:
【消费者2】收到 direct.queue2 消息:红色警报,由于日本排放核污水,惊现哥斯拉!
【消费者1】收到 direct.queue1 消息:红色警报,由于日本排放核污水,惊现哥斯拉!
------------------------------------------------------------------
【消费者1】收到 direct.queue1 消息:蓝色通知,警报解除,哥斯拉是假的!3、Topic交换机

Topic Exchange 与 Direct Exchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以 . 分割。例如:
| routingKey | 含义 |
| china.news | 中国的新闻消息 |
| china.weather | 中国的天气消息 |
| japan.news | 日本的新闻消息 |
| japan.weather | 日本的天气消息 |
Queue 与 Exchange 绑定时指定 bindingKey 可以用通配符:
| 通配符 | 含义 |
| # | 代指0个或多个单词 |
| * | 代指1个单词 |
准备工作:RabbitMQ 控制台创建 huan.topic 交换机,绑定到 topic.queue1 和 topic.queue2,以及下图的 routingKey 绑定关系:

生产者:
@Test
void testTopic1() {
String exchangeName = "huan.topic";
String routingKey = "japan.news";
String msg = "蓝色通知,警报解除,哥斯拉是假的!";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testTopic2() {
String exchangeName = "huan.topic";
String routingKey = "china.news";
String msg = "中国新闻,xxxxxxxx!";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testTopic3() {
String exchangeName = "huan.topic";
String routingKey = "china.weather";
String msg = "中国天气,xxxxxxxx!";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}消费者:
@RabbitListener(queues = "topic.queue1")
public void onTopic1(String msg) {
System.out.println("【消费者1】收到 topic.queue1 消息:" + msg);
}
@RabbitListener(queues = "topic.queue2")
public void onTopic2(String msg) {
System.out.println("【消费者2】收到 topic.queue2 消息:" + msg);
}运行结果:
【消费者2】收到 topic.queue2 消息:蓝色通知,警报解除,哥斯拉是假的!
------------------------------------------------------------------
【消费者2】收到 topic.queue2 消息:中国新闻,xxxxxxxx!
【消费者1】收到 topic.queue1 消息:中国新闻,xxxxxxxx!
------------------------------------------------------------------
【消费者1】收到 topic.queue1 消息:中国天气,xxxxxxxx!描述一下Direct交换机和Topic交换机的差异?
- topic交换机接收消息的routingKey可以是多个单词,以
.分割 - topic交换机与队列绑定时的bindingKey可以指定通配符
三、声明交换机和队列
交换机、队列以及绑定关系,可以用以下两种方式声明。
1、Java API 方式
步骤为:①声明交换机 ②声明队列 ③声明绑定关系
(1) Fanout 交换机
package tech.chstack.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
// 方式1,使用 builder
// return ExchangeBuilder.fanoutExchange("huan.fanout1").build();
// 方式2,直接 new
return new FanoutExchange("huan.fanout2");
}
@Bean
public Queue fanoutQueue3() {
// durable: 持久化(写入磁盘)
// return QueueBuilder.durable("fanout.queue3").build();
// 直接 new 的话,默认就是持久的
return new Queue("fanout.queue3");
}
/**
* 绑定队列和交换机
* 方式一:方法参数列表传参
*/
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue4() {
return new Queue("fanout.queue4");
}
/**
* 绑定队列和交换机
* 方式二:直接调方法
*/
@Bean
public Binding fanoutBinding4() {
// 下面的传参,实际上不是直接调方法,@Configuration 注解的类中,所有的加了 @Bean 注解
// 的方法,都会被 Spring 动态代理。当我们调用这个方法时,会检查有没有这个bean,如果有,
// 则直接返回这个 bean,如果没有,则调用这个方法,并把返回值作为这个bean
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}
}(2) Direct 交换机
package tech.chstack.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("huan.direct");
}
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}
// 为 direct 类型时,绑定相当繁琐!!
@Bean
public Binding directBindingRed(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
// 为 direct 类型时,绑定相当繁琐!!
@Bean
public Binding directBindingBlue(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
}2、Java Annotation 方式
package tech.chstack.consumer.listener;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AnnotationListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue.tom", durable = "true"),
exchange = @Exchange(name = "huan.direct2", type = ExchangeTypes.DIRECT),
key = {"milk", "water"}
))
public void onDirectTom(String msg) {
System.out.println("【消费者1】收到 direct.queue.tom 消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue.jerry", durable = "true"),
exchange = @Exchange(name = "huan.direct2", type = ExchangeTypes.DIRECT),
key = {"juice", "water"}
))
public void onDirectJerry(String msg) {
System.out.println("【消费者2】收到 direct.queue.jerry 消息:" + msg);
}
}实现效果:

四、消息转换器
1、默认实现 —— SimpleMessageConverter
准备工作:RabbitMQ 控制台创建 object.queue 的Queue
生产者:
@Test
void testSendObject() {
Map<String, String> msg = new HashMap<>();
msg.put("name", "huan");
msg.put("age", "18");
rabbitTemplate.convertAndSend("object.queue", msg);
}消费者(未编写,直接去控制台查看):

content_type: application/x-java-serialized-object 表示JDK的序列方式,我们可以到源码里看看是不是这样:
跟进 rabbitTemplate.convertAndSend() 函数,发现它使用了一个 MessageConverter 类型的接口。使用 CTRL+H 查看实现类:

可以看出,默认的消息转换器使用的是 SimpleMessageConverter
继续查看 toMessage() 方法是哪个,使用 CTRL+Alt+B 查看实现:

虽然 SimpleMessageConverter 类不直接提供 toMessage() 的实现,但是在其爷爷类 AbstractMessageConverter 提供了实现,说明这里调的是它爷爷的方法。
继续跟进 AbstractMessageConverter 的 toMessage() 方法,找到 createMessage()

在 debug 过程中,回退到上一步追踪位置:Ctrl+Alt+←
关键源码:
// SimpleMessageConverter 类
@Override
protected Message createMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
...
if (object instanceof byte[] objectBytes) {
...
} else if (object instanceof String string) {
...
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
...
}
...
}
// SerializationUtils.serialize(Object object)
public static byte[] serialize(Object object) {
if (object == null) {
return null;
}
ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
new ObjectOutputStream(stream).writeObject(object);
}
catch (IOException e) {
throw new IllegalArgumentException("Could not serialize object of type: "
+ object.getClass(), e);
}
return stream.toByteArray();
}2、采用JSON序列化代实现
默认是有 Jackson2JsonMessageConvertor 实现的,只是缺少依赖:

(1) 引入依赖:
<!-- 引入jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>(2) 配置 @Bean
所有用到 RabbitMQ 的子工程都需要配置这个 @Bean
package tech.chstack.consumer.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
当然,@Bean 除了在 @Configuration 中配置,可以在启动类上配置。
(3) 查看消息

消息类型已变为:content_type: application/json
3、消息接收
发送是什么类型,接收就是什么类型,非常便捷:
消费者:
@RabbitListener(queues = "object.queue")
public void onObject(Map<String, String> msg) {
System.out.println("【消费者】收到 object.queue 队列的消息:" + msg);
}运行结果:
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:148) ~[spring-rabbit-3.2.5.jar:3.2.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1487) ~[spring-rabbit-3.2.5.jar:3.2.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1780) ~[spring-rabbit-3.2.5.jar:3.2.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-3.2.5.jar:3.2.5]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1518) ~[spring-rabbit-3.2.5.jar:3.2.5]
at io.micrometer.observation.Observation.observe(Observation.java:498) ~[micrometer-observation-1.14.6.jar:1.14.6]
【消费者】收到 object.queue 队列的消息:{name=huan, age=18}第二条Json序列化的消息已经顺利反序列化,而第一条使用的是JDK的序列化方式,无法正确序列化,所以报了错。但是第一条消息也被消费了,此时出现了消息丢失的情况。
