美好365app官方下载-beat365体育ios版下载-365bet手机客户端

4.RabbitMQ - 延迟消息

RabbitMQ延迟消息 文章目录 RabbitMQ延迟消息一、延迟消息介绍二、实现2.1 死信交换机2.2 延迟消息插件2.3 取消超时订单 一、延迟消息介绍 延迟消

4.RabbitMQ - 延迟消息

RabbitMQ延迟消息

文章目录

RabbitMQ延迟消息一、延迟消息介绍二、实现2.1 死信交换机2.2 延迟消息插件2.3 取消超时订单

一、延迟消息介绍

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息

用户下单抢购,抢到了但是没有付款,此时其实库存的数量已经扣减了

如果用户迟迟没有付款,超过一定的时间,就会将此订单取消掉,库存的数量也会重新加回来

我们可以定义一个定时任务扫描数据中订单的状态,超过一定时间没有付款的,我们就将订单取消

延迟任务:设置在一定时间之后才执行的任务

当用户下单成功后,立刻向MQ中发送一条延迟消息,设定延迟时间30分钟,30分钟到了之后就可以收到此消息,检查订单状态,如果发现未支付,则订单直接取消。

这样解决了实效性的问题,同时对数据库的压力也很小

二、实现

2.1 死信交换机

当队列满足下列的条件之一时就会称为死信(dead letter)

消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

消费者不要这个消息了

消息是一个过期的消息(达到了队列或消息本身设置的过期时间),超时无人消费

要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

死信交换机只是一种称呼,和普通的交换机其实是一样的

我们不给simple.queue队列绑定消费者,给dlx.queue绑定一个消费者

因为simple.queue队列没有消费者,所以不会有人来消费,当有人通过simple.direct交换机向simple.queue队列发送一条过期时间为30秒的消息,此消息就会在simple.queue队列卡主

过了30s后,消息就会自动投递到dlx.direct死信交换机,然后进入dlx.queue队列,最终消费者拿到后会进行消费

利用死信交换机、死信队列。过期时间的方式,模拟出了延迟消息的效果

验证一下

在控制台创建simple.direct交换机

将此交换机与simple.queue绑定

注意!simple.queue并没有绑定到消费者,进入到simple.queue队列的消息都会变成死信

创建队列dlx.queue和dlx.direct并将其绑定

创建队列

创建交换机

进行绑定

给simple.queue队列设定死信交换机

注意,这个地方只能是在创建队列的时候进行绑定

在消费者模块代码中定义两个队列simple.queue、dlx.queue

//检查一下,一定不要有simple.queue的消费者

//@RabbitListener(queues = "simple.queue")

//public void listenSimpleQueue(String msg){

// System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");

// throw new RuntimeException("抛出异常了");

//}

@RabbitListener(queues = "dlx.queue")

public void listenDlxQueue(String msg){

log.info("消费者收到了dlx.queue的消息:【" + msg +"】");

}

发送消息

在控制台中下面的这个属性是带过期时间的属性

Java代码中的发送消息如下所示

@Test

void testSendTTLMessage() {

Message message = MessageBuilder

.withBody("hello".getBytes(StandardCharsets.UTF_8))

.setExpiration("10000") //过期时间10s

.build();

//发送到死信队列

rabbitTemplate.convertAndSend("simple.direct", "hi", message);//直接向队列发送消息

log.info("消息发送成功!");

}

simple.queue队列中10s内始终存在下面的一条消息

dlx.queue队列的消费者在10s后会接收到消息

2.2 延迟消息插件

这种定时功能,都是有一定的性能损耗的(Redis除外)

MQ或者Spring的定时功能是在程序内部维护一个时钟,比如每隔一秒就往前跳一次,这种时钟的运行过程中CPU就需要不停地计算,定时任务越多,对于CPU的占用越大,定时任务属于一种CPU密集型的任务

采用延迟消息带来的弊端就是给服务器CPU造成的额外压力比较大

使用交换机实现延迟消息非常的繁琐,需要定义很多的交换机和队列,而且死信交换机的目的是为了让我们人工处理死信消息,并不是为了延迟消息而生的

延迟消息的插件能自动实现延迟效果

RabbitMQ官方也推出了一个插件,原生支持延迟消息功能。

该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

暂存的时间取决于发消息时配置的时间(也就是延迟时间)

在Java代码中配置延迟交换机的两种方式

在声明交换机的时候,需要多添加一个参数delayed=“true”

注解的方式

在消费者模块声明交换机、队列

@RabbitListener(bindings = @QueueBinding(

//队列

value = @Queue(name = "delay.queue", durable = "true"),

//交换机

exchange = @Exchange(name = "hmall.direct", delayed = "true"),

//Routing key

key = "delay"

))

public void listenDelayMessage(String msg) {

log.info("接收到delay.queue的延迟消息【" + msg + "】");

}

注入Bean的方式

这种方式只声明了交换机

@Bean

public DirectExchange delayExchange(){

return ExchangeBuilder

.directExchange("delay.direct")

.delayed() //设置delay的属性为true 主要是这个

.durable(true) //持久化

.build();

}

发送延迟消息的Java代码

@Test

void testSendDelayMessage() {

// Message message = MessageBuilder

// .withBody("hello".getBytes(StandardCharsets.UTF_8))

// .setExpiration("10000") //过期时间10s

// .build();

// //发送到死信队列

rabbitTemplate.convertAndSend("dela.direct", "hi", "hello", new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration("10000");//延迟十秒

return message;

}

});//直接向队列发送消息

log.info("消息发送成功!");

}

2.3 取消超时订单

设置三十分钟后检测订单支付状态,存在两个问题

如果并发比较高,30分钟可能堆积消息过多,对MQ的压力很大

大多数订单在下单后1分钟内就会支付,但是却需要再MQ内等待30分钟,浪费资源

30分钟太长,可以缩短为10s,10s后立刻来检查有没有支付

假如10s后没有支付,可以再发一个10s的延迟消息,直到成功后不再发送延迟消息

这样的话MQ的压力会减少很多

处理如下所示:

查询支付状态的时候,需要先查询本地,之后再查询支付服务,查完之后判断支付状态

定义延时消息时间数组

package com.hmall.common.domain;

import com.hmall.common.utils.CollUtils;

import lombok.Data;

import java.util.List;

@Data

public class MultiDelayMessage {

/**

* 消息体

*/

private T data;

/**

* 记录延迟时间的集合

*/

private List delayMillis;

public MultiDelayMessage(T data, List delayMillis) {

this.data = data;

this.delayMillis = delayMillis;

}

public static MultiDelayMessage of(T data, Long ... delayMillis){

return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));

}

/**

* 获取并移除下一个延迟时间

* @return 队列中的第一个延迟时间

*/

public Long removeNextDelay(){

return delayMillis.remove(0);

}

/**

* 是否还有下一个延迟时间

*/

public boolean hasNextDelay(){

return !delayMillis.isEmpty();

}

}

定义好对应的交换机和队列

@Component

@RequiredArgsConstructor

public class PayStatusListener {

private final IOrderService orderService;

@RabbitListener(bindings = @QueueBinding(

value = @Queue(name = "mark.order.pay.queue", durable = "true"),

exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),

key = "pay.success"

))

public void listenOrderPay(Long orderId) {

/* // 1.查询订单

Order order = orderService.getById(orderId);

// 2.判断订单状态是否为未支付

if(order == null || order.getStatus() != 1){

// 订单不存在,或者状态异常

return;

}

// 3.如果未支付,标记订单状态为已支付

orderService.markOrderPaySuccess(orderId);*/

// update order set status = 2 where id = ? AND status = 1

orderService.lambdaUpdate()

.set(Order::getStatus, 2)

.set(Order::getPayTime, LocalDateTime.now())

.eq(Order::getId, orderId)

.eq(Order::getStatus, 1)

.update();

}

}

在方法中发送延迟检查订单状态的消息

@Override

@GlobalTransactional

public Long createOrder(OrderFormDTO orderFormDTO) {

// 1.订单数据

Order order = new Order();

// 1.1.查询商品

List detailDTOS = orderFormDTO.getDetails();

// 1.2.获取商品id和数量的Map

Map itemNumMap = detailDTOS.stream()

.collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));

Set itemIds = itemNumMap.keySet();

// 1.3.查询商品

List items = itemClient.queryItemByIds(itemIds);

if (items == null || items.size() < itemIds.size()) {

throw new BadRequestException("商品不存在");

}

// 1.4.基于商品价格、购买数量计算商品总价:totalFee

int total = 0;

for (ItemDTO item : items) {

total += item.getPrice() * itemNumMap.get(item.getId());

}

order.setTotalFee(total);

// 1.5.其它属性

order.setPaymentType(orderFormDTO.getPaymentType());

order.setUserId(UserContext.getUser());

order.setStatus(1);

// 1.6.将Order写入数据库order表中

save(order);

// 2.保存订单详情

List details = buildDetails(order.getId(), items, itemNumMap);

detailService.saveBatch(details);

// 3.扣减库存

try {

itemClient.deductStock(detailDTOS);

} catch (Exception e) {

throw new RuntimeException("库存不足!");

}

// 4.清理购物车商品

// cartClient.deleteCartItemByIds(itemIds);

try {

rabbitTemplate.convertAndSend(

MqConstants.TRADE_EXCHANGE_NAME, MqConstants.ORDER_CREATE_KEY,

itemIds/*,

new RelyUserInfoMessageProcessor()*/

);

} catch (AmqpException e) {

log.error("清理购物车的消息发送异常", e);

}

// 5.延迟检测订单状态消息

try {

MultiDelayMessage msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);

rabbitTemplate.convertAndSend(

MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,

new DelayMessageProcessor(msg.removeNextDelay().intValue())

);

} catch (AmqpException e) {

log.error("延迟消息发送异常!", e);

}

return order.getId();

}

将MessagePostProcessMessage对象提取出来了,不用每次都new了

@RequiredArgsConstructor

public class DelayMessageProcessor implements MessagePostProcessor {

private final int delay;

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setDelay(delay);

return message;

}

}

或者是使用下面视频里面的代码

但是下面的代码每次都要使用一个内部类

← 上一篇: 镌:镌怎么读,镌字什么意思?
下一篇: 成都钱多多招聘怎么样:探索岗位优势与企业文化 • 本地金 →

相关推荐

打卡考勤机怎么设置?

打卡考勤机怎么设置?

打卡考勤机怎么设置?——全面指南与实用建议 随着企业信息化和数字化转型的推进,考勤机已成为企业管理中不可或缺的一部分。无论是中小

日本队率先获得了2026年国际足联(FIFA)北中美世界杯决赛圈入场券。 连续8次进入世界杯决赛圈。相反,在同一天韩国队与排名第80位的阿曼队的对决中,韩国队最终以平局收场。森保一率领的日本国家足球队..

日本队率先获得了2026年国际足联(FIFA)北中美世界杯决赛圈入场券。 连续8次进入世界杯决赛圈。相反,在同一天韩国队与排名第80位的阿曼队的对决中,韩国队最终以平局收场。森保一率领的日本国家足球队..

사진 확대 图为,韩国男子足球队主教练洪明甫正在观看阿曼队与B组第7场比赛。 =照片来源。 日本队率先获得了2026年国际足联(FIFA)北中美世

海带一般焯水焯几分钟?掌握这个秘诀,美味翻倍!

海带一般焯水焯几分钟?掌握这个秘诀,美味翻倍!

厨房里,海带总是让人又爱又恨——爱它的鲜嫩爽口,恨它煮不好就硬得嚼不动!每次准备凉拌海带或炖汤时,你是否也纠结过:“海带一般焯

6平方线功率(6平方的电线能带多少千瓦6平方的电线能带多少千瓦)

6平方线功率(6平方的电线能带多少千瓦6平方的电线能带多少千瓦)

本文目录6平方的电线能带多少千瓦6平方的电线能带多少千瓦6平方线能带多少千瓦6个平方电线可以带多少千瓦6平方的电线能承受多大功率6平方

30帧和60帧的区别,拍视频用30帧还是60帧更好,手机如何剪辑高清60帧视频

30帧和60帧的区别,拍视频用30帧还是60帧更好,手机如何剪辑高清60帧视频

有关帧率的问题,到底是30帧好还是60帧好,这个要看我们的视频的用途是什么。一般情况下,30帧和60帧的视频,靠人类肉眼基本是辨别不出来

解决电脑无网络问题的八大步骤,轻松恢复上网体验

解决电脑无网络问题的八大步骤,轻松恢复上网体验

在现代生活中,互联网已成为我们工作和生活中不可或缺的一部分,然而,电脑忽然没有网络连接的困扰却时有发生。无论是因工作需要还是个