rabbitmq2.md 6.2 KB


title: RabbitMQ 拓展篇 date: 2019-01-22 16:17:19 tags: [Spring Boot,消息队列]

categories: [Java,Spring]

jackson

在 rabbitMq 中还有个重要的组件是 MessageConverter,用于消息格式的设置。 默认使用amqp的 SimpleMessageConverter 使用text传输,在传输量较大的数据时比较消耗性能。 另一种就是 Jackson2JsonMessageConverter,使用json传输。 全局配置文件

@Configuration
public class RabbitMqConfig {

    /* 设置消息传输形式 使用jackson 相对默认SimpleMessageConverter 提高性能 */

    /**
     * 发送消息设置用json的形式序列化
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    /**
     * 接受的时候使用jackson 反序列化
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

接受者配置

@Component
@RabbitListener(queues = "dcloud.fanout.queue",containerFactory = "rabbitListenerContainerFactory")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(@Payload String message) {
        System.out.println("receive" + message);
    }

}

贴出simple和jack ,string和Object 数据传输格式

javabean_jackson_messagejavabean_jackson_message

javabean_simple_messagejavabean_simple_message

string_jackson_messagestring_jackson_message

string_simple_messagestring_simple_message

消息高可用

消息不管在生成,传输,队列,消费中都可能存在问题,丢失或者重复消费等,因此需要配置一些参数或功能以达到消息高可用

消息持久化

持久化设置时三个缺一不可。

queue 持久化

设置queue为 durable ,new Queue() 和 RabbitMQ Managemnet 默认持久化

消息 持久化

核心为设置 Message 的 MessageDeliveryMode 为 PERSISTENT。 使用 rabbitTemplate.convertAndSend 方法中 默认为此模式

exchange 持久化

同queue 设置 exchange Type 为 durable ...

消息确认

确认发送(生产者)

继承 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 两个接口

/**
 * 是否正确到达 Exchange
 *
 * @param correlationData
 * @param ack
 * @param cause
 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)

/**
 * Exchange 发送到 queue 发送确定
 *
 * @param message
 * @param replyCode
 * @param replyText
 * @param exchange
 * @param routingKey
 */
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

并设置 指定监听回调

template.setConfirmCallback(this);
template.setReturnCallback(this);
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
// 那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
template.setMandatory(true);

确认发送(消费者)

@RabbitHandler
public void process(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    System.out.println("receive" + message);

    //确认消息,deliveryTag为相对channel的消息唯一标识,
    //multiple 批处理true 可以一次性确认 delivery_tag 小于等于传入值的所有消息
    channel.basicAck(tag, false);

    //拒绝消息 requeue 是否重新进入队列, true 重新进入队列 false 消息被丢弃
    channel.basicReject(tag, true);

    //否认消息
    channel.basicNack(tag, false, true);

}

死信和延时队列

保证消息高可用的场景还应包含对异常信息的处理,这部分数据在死信交换机中, 延时队列实现异步延迟操作的功能。

应用场景

  • 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单
  • 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用
  • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试
  • 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时

死信触发条件

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false
  • 消息/队列 因为设置了TTL而过期
  • 消息进入了一条已经达到最大长度的队列

设置方法

  • 新增死信交换器和队列(和普通的无差)
  • 新建延时队列设置args (x-dead-letter-exchange/x-dead-letter-routing-key)
  • 设置队列延时 args(x-message-ttl)或消息延时(message.getMessageProperties().setExpiration)

    /**
    * 延时队列
    *
    * @return
    */
    @Bean
    public Queue deployQueue() {
    Map<String, Object> args = new HashMap<>(2);
    args.put("x-dead-letter-exchange", "dcloud.dlxExchange");
    args.put("x-dead-letter-routing-key", "dlx");
    //        设置消息的过期时间, 单位是毫秒
    args.put("x-message-ttl", 5000);
    return new Queue("dcloud.deployQueue", true, false, false, args);
    }
    
    发送测试
    
    this.rabbitTemplate.convertAndSend(exchange, routingKey, message, message -> {
    // 设置消息延时 单位 毫秒
    message.getMessageProperties().setExpiration(10000 + "");
    return message;
    }, correlationDataExtend);