RocketMQ 定时任务案例
介绍
RocketMQ 是一个分布式消息中间件,广泛应用于异步通信、解耦系统、流量削峰等场景。除了常规的消息传递功能,RocketMQ 还支持定时消息和延时消息,这使得它非常适合用于实现定时任务。
定时任务是指在一定时间后执行的任务,例如订单超时取消、定时推送通知等。RocketMQ 的延时消息功能可以帮助我们轻松实现这些场景。
RocketMQ 延时消息
RocketMQ 提供了延时消息的功能,允许消息在指定的延迟时间后被消费。RocketMQ 支持的延时级别如下:
延时级别 | 延迟时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
备注
RocketMQ 的延时消息是基于消息的延时级别来实现的,而不是精确的时间点。
实现定时任务的步骤
1. 发送延时消息
首先,我们需要发送一条延时消息。以下是一个使用 Java 客户端发送延时消息的示例:
java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayedMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息,指定Topic、Tag和消息体
Message msg = new Message("TestTopic", "TagA", "Hello RocketMQ".getBytes());
// 设置延时级别为3,即10秒后消费
msg.setDelayTimeLevel(3);
// 发送消息
producer.send(msg);
// 关闭生产者
producer.shutdown();
}
}
2. 消费延时消息
接下来,我们需要编写消费者来消费延时消息。以下是一个简单的消费者示例:
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DelayedMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
3. 运行示例
- 启动 RocketMQ 的 NameServer 和 Broker。
- 运行
DelayedMessageProducer
,发送一条延时消息。 - 运行
DelayedMessageConsumer
,等待10秒后,消费者将接收到消息并打印出来。
实际应用场景
订单超时取消
在电商系统中,用户下单后如果在一定时间内未支付,订单将被自动取消。我们可以使用 RocketMQ 的延时消息来实现这一功能。
- 用户下单时,发送一条延时消息,延时时间为订单超时时间(例如30分钟)。
- 消费者在接收到消息后,检查订单状态,如果订单未支付,则取消订单。
定时推送通知
在某些应用中,需要定时向用户推送通知。例如,每天早上8点推送天气预报。
- 计算当前时间到目标时间的延时时间。
- 发送一条延时消息,延时时间为计算出的时间。
- 消费者在接收到消息后,执行推送通知的逻辑。
总结
RocketMQ 的延时消息功能为定时任务的实现提供了简单而强大的支持。通过发送延时消息,我们可以在指定的时间后触发任务的执行,适用于订单超时取消、定时推送通知等多种场景。
附加资源
练习
- 修改
DelayedMessageProducer
,发送一条延时1小时的消息,并验证消费者是否在1小时后接收到消息。 - 实现一个简单的订单超时取消系统,使用 RocketMQ 的延时消息来处理订单超时逻辑。