|
@@ -0,0 +1,304 @@
|
|
|
+---
|
|
|
+title: RabbitMQ
|
|
|
+date: 2019-01-19 13:51:17
|
|
|
+tags: [Spring Boot,消息队列]
|
|
|
+categories: [Java,Spring]
|
|
|
+---
|
|
|
+# 消息队列
|
|
|
+> 单线程中间件,主要用于异步通知、消息分发、缓存、分布式事务等场景
|
|
|
+
|
|
|
+## RabbitMQ
|
|
|
+> 阿里的开源软件
|
|
|
+
|
|
|
+主要有Exchange 交换器 和 Queue 队列功能组件。
|
|
|
+生产者会向Exchange发送消息并且绑定一个RoutingKey,
|
|
|
+Exchange 用来接收生产者发送的消息并通过模式和规则将这些消息路由给服务器中的队列,
|
|
|
+Exchange通过BindingKey找到匹配的队列,Queue 用来保存消息直到发送给消费者。
|
|
|
+![rabbitmq_model](rabbitmq_model.png)
|
|
|
+
|
|
|
+**四种运行模式**
|
|
|
+
|
|
|
+- fanout
|
|
|
+
|
|
|
+每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
|
|
|
+fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,
|
|
|
+每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。
|
|
|
+很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout
|
|
|
+类型转发消息是最快的,忽略RoutingKey
|
|
|
+![fanout 模式](fanout.png)
|
|
|
+
|
|
|
+- direct
|
|
|
+
|
|
|
+Exchange 通过比配RoutingKey和BindingKey相同的队列进行转发
|
|
|
+![direct 模式](direct.png)
|
|
|
+
|
|
|
+- topic
|
|
|
+
|
|
|
+与direct类似,更加灵活,RoutingKey 类似com.aa.bb.*/com.aa,bb.# ,用于
|
|
|
+匹配BindingKey复合的队列(“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个))
|
|
|
+![topic 模式](topic.png)
|
|
|
+
|
|
|
+- headers
|
|
|
+
|
|
|
+headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
|
|
|
+在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
|
|
|
+该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
|
|
|
+
|
|
|
+_参考:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html_
|
|
|
+
|
|
|
+
|
|
|
+## 安装 下载 命令
|
|
|
+
|
|
|
+linux/centos : 需现安装Erlang 环境 再通过官网 rpm 文件安装或yum install
|
|
|
+
|
|
|
+```
|
|
|
+系统服务: server rabbitmq-server start
|
|
|
+执行文件:
|
|
|
+rabbitmq-server -detached # 使用守护进程方式启动
|
|
|
+rabbitmq-server start # 使用阻塞方式启动
|
|
|
+rabbitmqctl stop # 关闭rabbitmq
|
|
|
+rabbitmqctl list_users # 查看后台管理员名单
|
|
|
+rabbitmqctl list_queues # 查看当前的所有的队列
|
|
|
+rabbitmqctl list_exchanges # 查看所有的交换机
|
|
|
+rabbitmqctl list_bindings # 查看所有的绑定
|
|
|
+rabbitmqctl list_connections # 查看所有的tcp连接
|
|
|
+rabbitmqctl list_channels # 查看所有的信道
|
|
|
+rabbitmqctl stop_app # 关闭应用
|
|
|
+rabbitmqctl start_app # 打开应用
|
|
|
+rabbitmqctl reset # 清空队列
|
|
|
+```
|
|
|
+
|
|
|
+## Spring boot 集成
|
|
|
+生成 Exchange 和 Queue 可以通过ip:15672 管理页面手动添加,也可以通过项目启动配置文件生成,
|
|
|
+两种方式取并集。消息类型可以发送json字符串或对象。
|
|
|
+
|
|
|
+### 配置文件
|
|
|
+```java
|
|
|
+package com.dzdy.dcloud.dcloud.config;
|
|
|
+
|
|
|
+import org.springframework.amqp.core.Binding;
|
|
|
+import org.springframework.amqp.core.BindingBuilder;
|
|
|
+import org.springframework.amqp.core.FanoutExchange;
|
|
|
+import org.springframework.amqp.core.Queue;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author : wangzhiyong
|
|
|
+ * @date : 2019/1/17 14:47
|
|
|
+ * description : fanout 模式
|
|
|
+ */
|
|
|
+//@Configuration
|
|
|
+public class FanoutRabbitConfig {
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue fanoutQueueA(){
|
|
|
+ return new Queue("dcloud.fanout.queue");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue fanoutQueueB() {
|
|
|
+ return new Queue("dcloud.fanout.queue2");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public FanoutExchange fanoutExchange(){
|
|
|
+ return new FanoutExchange("dcloud.fanout");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeA(Queue fanoutQueueA,FanoutExchange fanoutExchange){
|
|
|
+ return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeB(Queue fanoutQueueB,FanoutExchange fanoutExchange){
|
|
|
+ return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+```
|
|
|
+```java
|
|
|
+package com.dzdy.dcloud.dcloud.config;
|
|
|
+
|
|
|
+import org.springframework.amqp.core.*;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author : wangzhiyong
|
|
|
+ * @date : 2019/1/17 16:26
|
|
|
+ * description :
|
|
|
+ */
|
|
|
+//@Configuration
|
|
|
+public class DirectRabbitConfig {
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public DirectExchange directExchange(){
|
|
|
+ return new DirectExchange("dcloud.direct");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue directQueueA(){
|
|
|
+ return new Queue("dcloud.direct.queue1");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue directQueueB() {
|
|
|
+ return new Queue("dcloud.direct.queue2");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeA(Queue directQueueA, DirectExchange directExchange){
|
|
|
+ return BindingBuilder.bind(directQueueA).to(directExchange).with("aa");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeBA(Queue directQueueB,DirectExchange directExchange){
|
|
|
+ return BindingBuilder.bind(directQueueB).to(directExchange).with("aa");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeBB(Queue directQueueB,DirectExchange directExchange){
|
|
|
+ return BindingBuilder.bind(directQueueB).to(directExchange).with("bb");
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+```java
|
|
|
+package com.dzdy.dcloud.dcloud.config;
|
|
|
+
|
|
|
+import org.springframework.amqp.core.Binding;
|
|
|
+import org.springframework.amqp.core.BindingBuilder;
|
|
|
+import org.springframework.amqp.core.Queue;
|
|
|
+import org.springframework.amqp.core.TopicExchange;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author : wangzhiyong
|
|
|
+ * @date : 2019/1/17 16:54
|
|
|
+ * description :
|
|
|
+ */
|
|
|
+//@Configuration
|
|
|
+public class TopicRabbitConfig {
|
|
|
+ @Bean
|
|
|
+ public TopicExchange topicExchange() {
|
|
|
+ return new TopicExchange("dcloud.topic");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue topicQueueA() {
|
|
|
+ return new Queue("dcloud.topic.queue10");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue topicQueueB() {
|
|
|
+ return new Queue("dcloud.topic.queue11");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeA(Queue topicQueueA, TopicExchange topicExchange) {
|
|
|
+ return BindingBuilder.bind(topicQueueA).to(topicExchange).with("com.ys");
|
|
|
+ }
|
|
|
+
|
|
|
+ //星号(*) :只能匹配一个单词 井号(#):可以匹配0个或多个单词
|
|
|
+ @Bean
|
|
|
+ public Binding bindingExchangeBA(Queue topicQueueB, TopicExchange topicExchange) {
|
|
|
+ return BindingBuilder.bind(topicQueueB).to(topicExchange).with("com.dzdy.#");
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+```
|
|
|
+### 生产者
|
|
|
+```java
|
|
|
+package com.dzdy.dcloud.dcloud;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.dzdy.dcloud.dcloud.vo.MqUserVo;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.springframework.amqp.core.AmqpTemplate;
|
|
|
+import org.springframework.boot.test.context.SpringBootTest;
|
|
|
+import org.springframework.test.context.junit4.SpringRunner;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+@RunWith(SpringRunner.class)
|
|
|
+@SpringBootTest
|
|
|
+@Slf4j
|
|
|
+public class DcloudMqApplicationTests {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private AmqpTemplate rabbitTemplate;
|
|
|
+
|
|
|
+ private MqUserVo userVo = new MqUserVo("张氏", 12, 33.23, LocalDateTime.now());
|
|
|
+ @Test
|
|
|
+ public void contextLoads() {
|
|
|
+ }
|
|
|
+
|
|
|
+ // config 可以不用配置 如果配置会和网页配置的想重合
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void fanoutSendMsg() {
|
|
|
+// rabbitTemplate.convertAndSend("dcloud.fanout","",new MqUserVo("张氏",12,33.23,LocalDateTime.now()));
|
|
|
+ rabbitTemplate.convertAndSend("dcloud.fanout", "", JSONObject.toJSONString(userVo));
|
|
|
+// 直接发送到队列
|
|
|
+// rabbitTemplate.convertAndSend("dcloud.fanout.queue","hello mq");
|
|
|
+ log.info("fanoutSendMsg - success");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void directSendMsg() {
|
|
|
+ // 必须匹配 routingKey 不能为空
|
|
|
+ rabbitTemplate.convertAndSend("dcloud.direct", "bb", JSONObject.toJSONString(userVo));
|
|
|
+ log.info("directSendMsg - success");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void topicSendMsg(){
|
|
|
+ List<MqUserVo> userVos = new ArrayList<>();
|
|
|
+ userVos.add(userVo);
|
|
|
+ userVos.add(userVo);
|
|
|
+ userVos.add(userVo);
|
|
|
+ rabbitTemplate.convertAndSend("dcloud.topic","com.dzdy",userVos);
|
|
|
+ log.info("topicSendMsg - success");
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 消费者
|
|
|
+```java
|
|
|
+package com.dzdy.dcloud.dcloud.receive;
|
|
|
+
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author : wangzhiyong
|
|
|
+ * @date : 2019/1/17 15:03
|
|
|
+ * description :
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@RabbitListener(queues = "dcloud.direct.queue1")
|
|
|
+public class DirectReceiverA {
|
|
|
+
|
|
|
+ @RabbitHandler
|
|
|
+ public void process(String message) {
|
|
|
+ System.out.println("receive3" + message);
|
|
|
+ }
|
|
|
+
|
|
|
+// @RabbitHandler
|
|
|
+// public void process(MqUserVo mqUserVo) {
|
|
|
+// System.out.println("receive" + mqUserVo.toString());
|
|
|
+// }
|
|
|
+}
|
|
|
+```
|
|
|
+
|