# 一、初识 MQ
同步调用的问题微服务间基于 Feign 的调用就属于同步方式,存在一些问题。

# 同步调用的优点:
时效性较强,可以立即得到结果
# 同步调用的问题:
耦合度高性能和吞吐能力下降有额外的资源消耗有级联失败问题
故此异步调用方案产生:异步调用常见实现就是事件驱动模式
# 事件驱动的优势:
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量削峰
# 异步通信的优点:
耦合度低吞吐量提升故障隔离流量削峰
# 异步通信的缺点:
依赖于 Broker 的可靠性、安全性、吞吐能力架构复杂了,业务没有明显的流程线,不好追踪管理
# 二、RabbitMQ 快速入门
# 安装 RabbitMQ
在线拉取:docker pull rabbitmq:3-management
启动命令: (注释去掉)docker run \ -e RABBITMQ_DEFAULT_USER=lyggwsp \ -e RABBITMQ_DEFAULT_PASS=wyr0307 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ # 管理访问端口 -p 5672:5672 \ # 通讯端口 -d \ rabbitmq:3-management
我们访问主机地址 + 15672 端口任然无效,是因为我们插件没开:进入容器内部:docker exec -it mq bash 修改插件:rabbitmq-plugins enable rabbitmq_management 这样我们访问就有效了。
rabbitmq 的结构和概念:
# RabbitMQ 中的几个概念:
channel:操作 MQ 的工具 exchange:路由消息到队列中 queue:缓存消息 virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组
常见的消息模型:
官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列
- queuequeue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息

# 基本消息队列的消息发送流程:
建立 connection 创建 channel 利用 channel 声明队列利用 channel 向队列发送消息基本消息队列的消息接收流程:建立 connection 创建 channel 利用 channel 声明队列定义 consumer 的消费行为 handleDelivery () 利用 channel 将消费者与队列绑定
# 三、SpringAMQP
步骤 1:引入 AMQP 依赖因为 publisher 和 consumer 服务都需要 amqp 依赖,因此这里把依赖直接放到父工程 mq-demo 中:
步骤 2:在 publisher 中编写测试方法,向 simple.queue 发送消息
spring: | |
rabbitmq: | |
# 主机名 | |
host: 101.42.152.244 | |
port: 5672 | |
#虚拟主机 | |
virtual-host: "/" | |
# 用户名 | |
username: lyggwsp | |
# 密码 | |
password: wyr0307 |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
public class SpringAMOPTest { | |
@Autowired | |
private RabbitTemplate rabbitTemplate; | |
@Test | |
public void testS(){ | |
String queueName = "simple.queue"; | |
String message = "hello,my name is publisher"; | |
rabbitTemplate.convertAndSend(queueName,message); | |
} | |
} |
# 什么是 AMQP?
应用间消息通信的一种协议,与语言和平台无关。
# SpringAMQP 如何发送消息?
引入 amqp 的 starter 依赖配置 RabbitMQ 地址利用 RabbitTemplate 的 convertAndSend 方法
步骤 3:在 consumer 中编写消费逻辑,监听 simple.queue
spring: | |
rabbitmq: | |
# 主机名 | |
host: 101.42.152.244 | |
port: 5672 | |
#虚拟主机 | |
virtual-host: "/" | |
# 用户名 | |
username: lyggwsp | |
# 密码 | |
password: wyr0307 |
这是一个组件一个组件!!!!
@Component | |
public class SpringAMOPA { | |
@RabbitListener(queues = "simple.queue") | |
public void t(String msg){ | |
System.out.println("接收到的消息是:" + msg); | |
} | |
} |
SpringAMQP 如何接收消息?引入 amqp 的 starter 依赖配置 RabbitMQ 地址定义类,添加 @Component 注解类中声明方法,添加 @RabbitListener 注解,方法参数就时消息注意:消息一旦消费就会从队列删除,RabbitMQ 没有消息回溯功能
Work Queue 工作队列 Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
模拟 WorkQueue,实现一个队列绑定多个消费者
基本思路如下:在 publisher 服务中定义测试方法,每秒产生 50 条消息,发送到 simple.queue 在 consumer 服务中定义两个消息监听者,都监听 simple.queue 队列消费者 1 每秒处理 50 条消息,消费者 2 每秒处理 10 条消息
这玩意讲的也就是一个重点:(在监听者 yml 配置文件中多配置一下 prefetch:)
spring: | |
rabbitmq: | |
# 主机名 | |
host: 101.42.152.244 | |
port: 5672 | |
#虚拟主机 | |
virtual-host: "/" | |
# 用户名 | |
username: lyggwsp | |
# 密码 | |
password: wyr0307 | |
# 控制预取消息的上限 | |
listener: | |
simple: | |
prefetch: 1 # 每次只能获取一条消息,处理完成之后才能获取下一条 |
# Work 模型的使用:
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理通过设置 prefetch 来控制消费者预取的消息数量
发布( Publish )、订阅( Subscribe )发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了 exchange(交换机)。常见 exchange 类型包括:Fanout:广播 Direct:路由 Topic:话题
注意:exchange 负责消息路由,而不是存储,路由失败则消息丢失
# 发布订阅 - Fanout Exchange
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的 queue