title: RabbitMQ 拓展篇 date: 2019-01-22 16:17:19 tags: [Spring Boot,消息队列]
在 rabbitMq 中还有个重要的组件是 MessageConverter,用于消息格式的设置。 默认使用amqp的 SimpleMessageConverter 使用text传输,在传输量较大的数据时比较消耗性能。 另一种就是 Jackson2JsonMessageConverter,使用json传输。 全局配置文件
@Configuration
public class RabbitMqConfig {
/* 设置消息传输形式 使用jackson 相对默认SimpleMessageConverter 提高性能 */
/**
* 发送消息设置用json的形式序列化
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/**
* 接受的时候使用jackson 反序列化
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
接受者配置
@Component
@RabbitListener(queues = "dcloud.fanout.queue",containerFactory = "rabbitListenerContainerFactory")
public class FanoutReceiverA {
@RabbitHandler
public void process(@Payload String message) {
System.out.println("receive" + message);
}
}
贴出simple和jack ,string和Object 数据传输格式
消息不管在生成,传输,队列,消费中都可能存在问题,丢失或者重复消费等,因此需要配置一些参数或功能以达到消息高可用
持久化设置时三个缺一不可。
设置queue为 durable ,new Queue() 和 RabbitMQ Managemnet 默认持久化
核心为设置 Message 的 MessageDeliveryMode 为 PERSISTENT。 使用 rabbitTemplate.convertAndSend 方法中 默认为此模式
同queue 设置 exchange Type 为 durable ...
继承 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 两个接口
/**
* 是否正确到达 Exchange
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
/**
* Exchange 发送到 queue 发送确定
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
并设置 指定监听回调
template.setConfirmCallback(this);
template.setReturnCallback(this);
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
// 那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
template.setMandatory(true);
@RabbitHandler
public void process(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("receive" + message);
//确认消息,deliveryTag为相对channel的消息唯一标识,
//multiple 批处理true 可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(tag, false);
//拒绝消息 requeue 是否重新进入队列, true 重新进入队列 false 消息被丢弃
channel.basicReject(tag, true);
//否认消息
channel.basicNack(tag, false, true);
}
保证消息高可用的场景还应包含对异常信息的处理,这部分数据在死信交换机中, 延时队列实现异步延迟操作的功能。
设置队列延时 args(x-message-ttl)或消息延时(message.getMessageProperties().setExpiration)
/**
* 延时队列
*
* @return
*/
@Bean
public Queue deployQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "dcloud.dlxExchange");
args.put("x-dead-letter-routing-key", "dlx");
// 设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
return new Queue("dcloud.deployQueue", true, false, false, args);
}
发送测试
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, message -> {
// 设置消息延时 单位 毫秒
message.getMessageProperties().setExpiration(10000 + "");
return message;
}, correlationDataExtend);