--- title: RabbitMQ 拓展篇 date: 2019-01-22 16:17:19 tags: [Spring Boot,消息队列] categories: [Java,Spring] --- ### jackson 在 rabbitMq 中还有个重要的组件是 MessageConverter,用于消息格式的设置。 默认使用amqp的 SimpleMessageConverter 使用text传输,在传输量较大的数据时比较消耗性能。 另一种就是 Jackson2JsonMessageConverter,使用json传输。 **全局配置文件** ```java @Configuration public class RabbitMqConfig { /* 设置消息传输形式 使用jackson 相对默认SimpleMessageConverter 提高性能 */ /** * 发送消息设置用json的形式序列化 * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } /** * 接受的时候使用jackson 反序列化 * @param connectionFactory * @return */ @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } } ``` **接受者配置** ```java @Component @RabbitListener(queues = "dcloud.fanout.queue",containerFactory = "rabbitListenerContainerFactory") public class FanoutReceiverA { @RabbitHandler public void process(@Payload String message) { System.out.println("receive" + message); } } ``` 贴出simple和jack ,string和Object 数据传输格式 **javabean_jackson_message**![javabean_jackson_message](javabean_jackson_message.jpg) **javabean_simple_message**![javabean_simple_message](javabean_simple_message.jpg) **string_jackson_message**![string_jackson_message](string_jackson_message.jpg) **string_simple_message**![string_simple_message](string_simple_message.jpg) ### 消息高可用 消息不管在生成,传输,队列,消费中都可能存在问题,丢失或者重复消费等,因此需要配置一些参数或功能以达到消息高可用 #### 消息持久化 持久化设置时三个缺一不可。 ##### queue 持久化 设置queue为 durable ,new Queue() 和 RabbitMQ Managemnet 默认持久化 ##### 消息 持久化 核心为设置 Message 的 MessageDeliveryMode 为 PERSISTENT。 使用 rabbitTemplate.convertAndSend 方法中 默认为此模式 ##### exchange 持久化 同queue 设置 exchange Type 为 durable ... #### 消息确认 #### 确认发送(生产者) 继承 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 两个接口 ``` /** * 是否正确到达 Exchange * * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) /** * Exchange 发送到 queue 发送确定 * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) ``` 并设置 指定监听回调 ``` template.setConfirmCallback(this); template.setReturnCallback(this); // 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue, // 那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body); // 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉 template.setMandatory(true); ``` #### 确认发送(消费者) ``` @RabbitHandler public void process(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println("receive" + message); //确认消息,deliveryTag为相对channel的消息唯一标识, //multiple 批处理true 可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(tag, false); //拒绝消息 requeue 是否重新进入队列, true 重新进入队列 false 消息被丢弃 channel.basicReject(tag, true); //否认消息 channel.basicNack(tag, false, true); } ``` ### 死信和延时队列 保证消息高可用的场景还应包含对异常信息的处理,这部分数据在死信交换机中, 延时队列实现异步延迟操作的功能。 #### 应用场景 - 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单 - 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用 - 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试 - 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时 #### 死信触发条件 - 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false - 消息/队列 因为设置了TTL而过期 - 消息进入了一条已经达到最大长度的队列 #### 设置方法 - 新增死信交换器和队列(和普通的无差) - 新建延时队列设置args (x-dead-letter-exchange/x-dead-letter-routing-key) - 设置队列延时 args(x-message-ttl)或消息延时(message.getMessageProperties().setExpiration) ``` /** * 延时队列 * * @return */ @Bean public Queue deployQueue() { Map args = new HashMap<>(2); args.put("x-dead-letter-exchange", "dcloud.dlxExchange"); args.put("x-dead-letter-routing-key", "dlx"); // 设置消息的过期时间, 单位是毫秒 args.put("x-message-ttl", 5000); return new Queue("dcloud.deployQueue", true, false, false, args); } 发送测试 this.rabbitTemplate.convertAndSend(exchange, routingKey, message, message -> { // 设置消息延时 单位 毫秒 message.getMessageProperties().setExpiration(10000 + ""); return message; }, correlationDataExtend); ```