RabbitMQ

一、基于Queue的消息发送与接收

1、引入 Spring AMQP 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、消息发送:
rabbitTemplate.convertAndSend(String routingKey, final Object object)
// routingKey 表示 RabbitMQ 中的 queues;object 表示待发内容
rabbitTemplate.convertAndSend("simple.queue", "消费者,你好!");

for (int i = 0; i < 50; i++) {
    rabbitTemplate.convertAndSend("work.queue", "你好,work.queue!");
    Thread.sleep(20);
}
3、消息接收:
package tech.chstack.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MqListener {

    @RabbitListener(queues = "simple.queue")
    public void anyMethodName(String msg) {
        System.out.println("simple.queue收到消息:" + msg);
    }

    @RabbitListener(queues = "work.queue")
    public void onWorkQueue1(String msg) throws InterruptedException {
        System.out.println("【消费者1】收到 work.queue 消息:" + msg);
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void onWorkQueue2(String msg) throws InterruptedException {
        System.err.println("【消费者2】收到 work.queue 消息:" + msg);
        Thread.sleep(200);
    }
}

Work Queue 任务模型,即多个消费者绑定到一个队列,共同消费队列中的消息。从代码中可以看出,只要写多个 @RabbitListener(queues = "work.queue") 即可。

注意:多个消费者默认采用轮询的方式,来消费信息。对于有“信息处理能力不同的消费者”存在的情况,这种轮询显然是不合理的。

4、能者多劳配置:
# 修改consumer服务配置如下:
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

prefetch: 1  表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询。

如何解决消息堆积问题?

(1) 采用 work queue 模型,在队列上绑定多个消费者,提高消息处理的能力。

(2) 优化代码:缓存、池化、异步(Java代码中的异步,而非MQ套娃)

(3) 通过设置 prefetch 为 1,实现能者多劳,降低消费总时间(重要!)

二、通过交换机通信

多个队列的原因是有多个微服务(业务), 多个消费者的原因是加速业务处理

routingKey 生产者 发送消息时指定的“标签”,告诉交换器“这条消息属于什么类别”。
bindingKey 消费者 绑定队列时设置的“过滤规则”,告诉交换器“哪些队列需要接收这个类别的消息”。
1、Fanout交换机

Fanout Exchange 会将收到的消息,广播到每个跟其绑定的queue,所以也叫广播模式

准备工作:rabbitmq控制台创建 huan.fanout 交换机,绑定到 fanout.queue1fanout.queue2 

生产者:

// 发送消息
public void convertAndSend(String exchange, String routingKey, final Object object)
// 示例:因为是Fanout是群发,所以不需要绑定routingKey
rabbitTemplate.convertAndSend("huan.fanout", null, "hello everyone!");

消费者:

@RabbitListener(queues = "fanout.queue1")
public void onFanout1(String msg) {
    System.out.println("【消费者1】收到 fanout.queue1 消息:" + msg);
}

@RabbitListener(queues = "fanout.queue2")
public void onFanout2(String msg) {
    System.out.println("【消费者2】收到 fanout.queue2 消息:" + msg);
}

运行结果:

【消费者2】收到 fanout.queue2 消息:hello everyone!
【消费者1】收到 fanout.queue1 消息:hello everyone!
2、Direct交换机

Direct Exchange 会将收到的消息,根据路由规则,路由到指定的queue,因此也称为定向路由

(1) 每个 Queue 都与 Exchange 设置一个 BindingKey
(2) 发布者发送消息时,指定消息的 RoutingKey
(3) Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

准备工作:rabbitmq控制台创建 huan.direct 交换机,绑定到 direct.queue1direct.queue2,以及下表的绑定关系:

Queue RoutingKey 
direct.queue1red、blue
direct.queue2red、yellow

生产者:

@Test
void testDirect1() {
    String exchangeName = "huan.direct";
    String routingKey = "red";
    String msg = "红色警报,由于日本排放核污水,惊现哥斯拉!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testDirect2() {
    String exchangeName = "huan.direct";
    String routingKey = "blue";
    String msg = "蓝色通知,警报解除,哥斯拉是假的!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}

消费者:

@RabbitListener(queues = "direct.queue1")
public void onDirect1(String msg) {
    System.out.println("【消费者1】收到 direct.queue1 消息:" + msg);
}

@RabbitListener(queues = "direct.queue2")
public void onDirect2(String msg) {
    System.out.println("【消费者2】收到 direct.queue2 消息:" + msg);
}

运行结果:

【消费者2】收到 direct.queue2 消息:红色警报,由于日本排放核污水,惊现哥斯拉!
【消费者1】收到 direct.queue1 消息:红色警报,由于日本排放核污水,惊现哥斯拉!
------------------------------------------------------------------
【消费者1】收到 direct.queue1 消息:蓝色通知,警报解除,哥斯拉是假的!
3、Topic交换机

Topic Exchange 与 Direct Exchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以 . 分割。例如:

routingKey含义
china.news中国的新闻消息
china.weather中国的天气消息
japan.news日本的新闻消息
japan.weather日本的天气消息

Queue 与 Exchange 绑定时指定 bindingKey 可以用通配符:

通配符含义
#代指0个或多个单词
*代指1个单词

准备工作:RabbitMQ 控制台创建 huan.topic 交换机,绑定到 topic.queue1topic.queue2,以及下图的 routingKey 绑定关系:

 

生产者:

@Test
void testTopic1() {
    String exchangeName = "huan.topic";
    String routingKey = "japan.news";
    String msg = "蓝色通知,警报解除,哥斯拉是假的!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testTopic2() {
    String exchangeName = "huan.topic";
    String routingKey = "china.news";
    String msg = "中国新闻,xxxxxxxx!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
------------------------------------------------------------------
@Test
void testTopic3() {
    String exchangeName = "huan.topic";
    String routingKey = "china.weather";
    String msg = "中国天气,xxxxxxxx!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}

消费者:

@RabbitListener(queues = "topic.queue1")
public void onTopic1(String msg) {
    System.out.println("【消费者1】收到 topic.queue1 消息:" + msg);
}

@RabbitListener(queues = "topic.queue2")
public void onTopic2(String msg) {
    System.out.println("【消费者2】收到 topic.queue2 消息:" + msg);
}

运行结果:

【消费者2】收到 topic.queue2 消息:蓝色通知,警报解除,哥斯拉是假的!
------------------------------------------------------------------
【消费者2】收到 topic.queue2 消息:中国新闻,xxxxxxxx!
【消费者1】收到 topic.queue1 消息:中国新闻,xxxxxxxx!
------------------------------------------------------------------
【消费者1】收到 topic.queue1 消息:中国天气,xxxxxxxx!

描述一下Direct交换机和Topic交换机的差异?

  • topic交换机接收消息的routingKey可以是多个单词,以.分割
  • topic交换机与队列绑定时的bindingKey可以指定通配符

三、声明交换机和队列

交换机、队列以及绑定关系,可以用以下两种方式声明。

1、Java API 方式

步骤为:①声明交换机 ②声明队列 ③声明绑定关系

(1) Fanout 交换机

package tech.chstack.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        // 方式1,使用 builder
        // return ExchangeBuilder.fanoutExchange("huan.fanout1").build();
        // 方式2,直接 new
        return new FanoutExchange("huan.fanout2");
    }

    @Bean
    public Queue fanoutQueue3() {
        // durable: 持久化(写入磁盘)
        // return QueueBuilder.durable("fanout.queue3").build();
        // 直接 new 的话,默认就是持久的
        return new Queue("fanout.queue3");
    }

    /**
     * 绑定队列和交换机
     * 方式一:方法参数列表传参
     */
    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }


    @Bean
    public Queue fanoutQueue4() {
        return new Queue("fanout.queue4");
    }

    /**
     * 绑定队列和交换机
     * 方式二:直接调方法
     */
    @Bean
    public Binding fanoutBinding4() {
        // 下面的传参,实际上不是直接调方法,@Configuration 注解的类中,所有的加了 @Bean 注解
        // 的方法,都会被 Spring 动态代理。当我们调用这个方法时,会检查有没有这个bean,如果有,
        // 则直接返回这个 bean,如果没有,则调用这个方法,并把返回值作为这个bean
        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}

(2) Direct 交换机

package tech.chstack.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("huan.direct");
    }

    @Bean
    public Queue directQueue1() {
        return new Queue("direct.queue1");
    }

    // 为 direct 类型时,绑定相当繁琐!!
    @Bean
    public Binding directBindingRed(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    // 为 direct 类型时,绑定相当繁琐!!
    @Bean
    public Binding directBindingBlue(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

}
2、Java Annotation 方式
package tech.chstack.consumer.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AnnotationListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue.tom", durable = "true"),
            exchange = @Exchange(name = "huan.direct2", type = ExchangeTypes.DIRECT),
            key = {"milk", "water"}
    ))
    public void onDirectTom(String msg) {
        System.out.println("【消费者1】收到 direct.queue.tom 消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue.jerry", durable = "true"),
            exchange = @Exchange(name = "huan.direct2", type = ExchangeTypes.DIRECT),
            key = {"juice", "water"}
    ))
    public void onDirectJerry(String msg) {
        System.out.println("【消费者2】收到 direct.queue.jerry 消息:" + msg);
    }
    
}

实现效果:

四、消息转换器

1、默认实现 —— SimpleMessageConverter

准备工作:RabbitMQ 控制台创建 object.queue 的Queue

生产者:

@Test
void testSendObject() {
    Map<String, String> msg = new HashMap<>();
    msg.put("name", "huan");
    msg.put("age", "18");
    rabbitTemplate.convertAndSend("object.queue", msg);
}

消费者(未编写,直接去控制台查看):

content_type: application/x-java-serialized-object 表示JDK的序列方式,我们可以到源码里看看是不是这样:

跟进 rabbitTemplate.convertAndSend() 函数,发现它使用了一个 MessageConverter 类型的接口。使用 CTRL+H 查看实现类:

可以看出,默认的消息转换器使用的是 SimpleMessageConverter 

继续查看 toMessage() 方法是哪个,使用 CTRL+Alt+B 查看实现:

虽然 SimpleMessageConverter 类不直接提供 toMessage() 的实现,但是在其爷爷类 AbstractMessageConverter 提供了实现,说明这里调的是它爷爷的方法。

继续跟进 AbstractMessageConvertertoMessage() 方法,找到 createMessage()

在 debug 过程中,回退到上一步追踪位置:Ctrl+Alt+←

关键源码:

// SimpleMessageConverter 类
@Override
protected Message createMessage(Object object, MessageProperties messageProperties)
        throws MessageConversionException {

    ...
    if (object instanceof byte[] objectBytes) {
        ...
    } else if (object instanceof String string) {
        ...
    } else if (object instanceof Serializable) {
        try {
            bytes = SerializationUtils.serialize(object);
        }
        ...
    }
    ...
}

// SerializationUtils.serialize(Object object)
public static byte[] serialize(Object object) {
    if (object == null) {
        return null;
    }
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    try {
        new ObjectOutputStream(stream).writeObject(object);
    }
    catch (IOException e) {
        throw new IllegalArgumentException("Could not serialize object of type: " 
                                             + object.getClass(), e);
    }
    return stream.toByteArray();
}
2、采用JSON序列化代实现

默认是有 Jackson2JsonMessageConvertor 实现的,只是缺少依赖:

(1) 引入依赖:

<!-- 引入jackson -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

(2) 配置 @Bean

所有用到 RabbitMQ 的子工程都需要配置这个 @Bean

package tech.chstack.consumer.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

当然,@Bean 除了在 @Configuration 中配置,可以在启动类上配置。

(3) 查看消息

消息类型已变为:content_type: application/json 

3、消息接收

发送是什么类型,接收就是什么类型,非常便捷:

消费者:

@RabbitListener(queues = "object.queue")
public void onObject(Map<String, String> msg) {
    System.out.println("【消费者】收到 object.queue 队列的消息:" + msg);
}

运行结果:

org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
	at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:148) ~[spring-rabbit-3.2.5.jar:3.2.5]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1487) ~[spring-rabbit-3.2.5.jar:3.2.5]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1780) ~[spring-rabbit-3.2.5.jar:3.2.5]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-3.2.5.jar:3.2.5]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1518) ~[spring-rabbit-3.2.5.jar:3.2.5]
	at io.micrometer.observation.Observation.observe(Observation.java:498) ~[micrometer-observation-1.14.6.jar:1.14.6]

【消费者】收到 object.queue 队列的消息:{name=huan, age=18}

第二条Json序列化的消息已经顺利反序列化,而第一条使用的是JDK的序列化方式,无法正确序列化,所以报了错。但是第一条消息也被消费了,此时出现了消息丢失的情况。

This article was updated on