RabbitMQ实战解决方案
一、RabbitMQ死信队列
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生的原因:
消息投递到MQ中存放 消息已经过期
队列达到最大的长度 (队列容器已经满了)MQ拒绝接受消息
消费者消费多次消息失败,就会转移存放到死信队列中
1.SpringBoot整合死信队列
mave依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
application.yml
spring:
rabbitmq:
####连接地址
host: 192.168.75.130
####端口号
port: 5672
####账号
username: admin
####密码
password: admin
### 地址
virtual-host: /wmh
server:
port: 8080
###模拟演示死信队列
#死信队列
dlx:
exchange: dlx_exchange
queue: order_dlx_queue
routingKey: dlx
#订单队列
order:
exchange: order_exchange
queue: order_queue
routingKey: order
正常队列及死信队列配置:
@Component
public class DeadLetterMQConfig {
/**
* 订单交换机
*/
@Value("${order.exchange}")
private String orderExchange;
/**
* 订单队列
*/
@Value("${order.queue}")
private String orderQueue;
/**
* 订单路由key
*/
@Value("${order.routingKey}")
private String orderRoutingKey;
/**
* 死信交换机
*/
@Value("${dlx.exchange}")
private String dlxExchange;
/**
* 死信队列
*/
@Value("${dlx.queue}")
private String dlxQueue;
/**
* 死信路由
*/
@Value("${dlx.routingKey}")
private String dlxRoutingKey;
/**
* 声明死信交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
/**
* 声明死信队列
*
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
/**
* 声明订单业务交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
/**
* 绑定死信队列到死信交换机
*
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}
/**
* 声明订单队列
*
* @return Queue
*/
@Bean
public Queue orderQueue() {
// 订单队列绑定我们的死信交换机
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange", dlxExchange);
arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, arguments);
}
/**
* 绑定订单队列到订单交换机
*
* @return Binding
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}
订单消费者:
@Component
public class OrderConsumer {
/**
* 监听队列回调的方法
*
* @param msg
*/
@RabbitListener(queues = "order_queue")
public void orderConsumer(String msg) {
System.out.println("正常订单消费者消息msg:" + msg);
}
}
死信消费者:
@Component
public class OrderDlxConsumer {
/**
* 死信队列监听队列回调的方法
*
* @param msg
*/
@RabbitListener(queues = "order_dlx_queue")
public void orderConsumer(String msg) {
System.out.println("死信队列消费订单消息" + msg);
}
}
生产者:
@RestController
public class DeadLetterProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订单交换机
*/
@Value("${order.exchange}")
private String orderExchange;
/**
* 订单路由key
*/
@Value("${order.routingKey}")
private String orderRoutingKey;
@RequestMapping("/sendOrder")
public String sendOrder() {
String msg = "发送订单测试消息";
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
return "succcess";
}
}
如若测试死信队列,可将正常订单消费者代码注释。
二、消息中间件如何获取消费结果
根据业务id主动查询
maven依赖:
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!-- mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
核心代码:
@RestController
public class DeadLetterProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订单交换机
*/
@Value("${order.exchange}")
private String orderExchange;
/**
* 订单路由key
*/
@Value("${order.routingKey}")
private String orderRoutingKey;
@Autowired
private OrderMapper orderMapper;
@RequestMapping("/sendOrder")
public String sendOrder() {
String orderId = System.currentTimeMillis() + "";
String orderName = "测试订单名字----------";
sendMsg(orderName, orderId);
return orderId;
}
@Async
public void sendMsg(String orderName, String orderId) {
OrderEntity orderEntity = new OrderEntity(orderName, orderId);
String msg = JSONObject.toJSONString(orderEntity);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});
}
@RequestMapping("/getOrder")
public Object getOrder(String orderId) {
OrderEntity order = orderMapper.getOrder(orderId);
if (order == null) {
return "该订单没有被消费或者订单号错误!";
}
return order;
}
}
数据访问层:
public interface OrderMapper {
@Insert("insert order_info values (null,#{orderName},#{orderId})")
int addOrder(OrderEntity orderEntity);
@Select("SELECT * from order_info where orderId=#{orderId} ")
OrderEntity getOrder(String orderId);
}
@Data
public class OrderEntity {
private int id;
private String orderName;
private String orderId;
public OrderEntity(String orderName, String orderId) {
this.orderName = orderName;
this.orderId = orderId;
}
public OrderEntity() {
}
}
application.yml
spring:
rabbitmq:
####连接地址
host: 192.168.75.130
####端口号
port: 5672
####账号
username: admin
####密码
password: admin
### 地址
virtual-host: /wmh
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
server:
port: 8080
###模拟演示死信队列
mayikt:
dlx:
exchange: dlx_exchange
queue: order_dlx_queue
routingKey: dlx
###备胎交换机
order:
exchange: order_exchange
queue: order_queue
routingKey: order
``
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!