RocketMQ笔记2
[toc]
# RocketMQ笔记2
RocketMQ 当前版本5.x
# RocketMQ的使用
# 生产者发送消息
生产者发送消息的步骤
1.创建消息生产者producer,并设置生产者组名
2.指定Nameserver地址
3.启动生产者producer
4.创建消息对象,指定消息的主题Topic、Tag和消息体
5.发送消息对象
6.关闭生产者producer
2
3
4
5
6
# 发送消息的模式
生产者可以发送各种消息。发送的消息主要分为3种。
- 同步消息:生产者需要等待消息的发送结果后,才会继续执行下面的代码。
- 异步消息:生产者不需等待消息发送结果,直接执行下面的代码。生成者主要通过异步函数的形式来接收消息发送结果。
- 单向消息:单向消息是指无需等待发送结果的使用场景。
① 发送同步消息
同步消息是指,发送消息后需要等待服务器的响应结果后,程序才能执行下去。
同步消息主要用于实时性要求高的场景。比如:重要的消息通知,短信通知等。
public class TEST1 {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 1. 实例化生产者Producer,并设置生产者所属的生产者组名
DefaultMQProducer producer = new DefaultMQProducer("myProducer-group01");
// 2. 为生产者设置Nameserver的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3. 启动生产者Producer
producer.start();
// 4. 创建消息对象。主要设置消息的主题topic,标签tag,消息内容
Message msg = new Message("myTopic01","Tag01",("this is a RocketMq Message").getBytes());
// 5. 发送同步消息到一个Broker
SendResult sendResult = producer.send(msg,10000);
// 通过sendResult判断消息是否成功送达
System.out.println("sendResult = "+sendResult);
// 6. 如果不再发送消息,关闭生产者
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
控制台上可以看到生产者发送的消息。
② 发送异步消息
异步消息是指,发送消息后不用等待RocktetMQ的响应结果,程序直接继续执行下去。
异步消息通常用在响应时间有些长的业务场景,即程序本身不能容忍长时间地等待响应结果。例如,上传大文件等
public class TEST1 {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 1. 实例化生产者Producer,并设置生产者所属的生产者组名
DefaultMQProducer producer = new DefaultMQProducer("myProducer-group01");
// 2. 为生产者设置Nameserver的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3. 启动生产者Producer
producer.start();
// 非必须。指定异步消息发送失败后不进行重试发送,发送0次
producer.setRetryTimesWhenSendAsyncFailed(0);
// 4. 创建消息对象。主要设置消息的主题topic,标签tag,消息内容
Message msg = new Message("myTopic01","Tag01",("Hello world").getBytes());
// 5. 发送异步消息到一个Broker,
// 这里有两个回调函数,成功回调和失败回调。
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功。sendResult = "+sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步消息发送失败。");
e.printStackTrace();
}
});
//程序暂停2s,防止异步消息发送之前就把生产者关闭了,从而导致异步消息发送失败。
Thread.sleep(2000);
// 6. 如果不再发送消息,关闭生产者
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
③ 发送单向消息
单向消息是指,发送消息后不关心消息发送结果的场景。通常用在耗时短,且重要性不高的场景。例如日志发送等
public class TEST1 {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 1. 实例化生产者Producer,并设置生产者所属的生产者组名
DefaultMQProducer producer = new DefaultMQProducer("myProducer-group01");
// 2. 为生产者设置Nameserver的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3. 启动生产者Producer
producer.start();
// 4. 创建消息对象。主要设置消息的主题topic,标签tag,消息内容
Message msg = new Message("myTopic01","Tag01",("Hello world2").getBytes());
// 5. 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
// 6. 如果不再发送消息,关闭生产者
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 消费者消费消息
消费者消费消息的步骤
1. 创建消费者Consumer,制定消费者组名
2. 指定Nameserver地址
3. 订阅消息Topic和消息Tag
4. 设置回调函数,编写如何处理消息
5. 启动消费者consumer,来消费消息。
2
3
4
5
消费者基本上都是以监听的方式来实时监听消息队列中的消息。一旦有符合的消息,那么消费者就会去消费它。
代码如下:
public class TEST2 {
public static void main(String[] args) throws Exception {
// 1. 实例化消息消费者,设置消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer-group01");
// 2. 指定Namesrv地址信息.
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3. 指定消费者订阅的消息Topic和消息Tag,*表示所有Tag
consumer.subscribe("myTopic01", "*");
// 4. 设置回调函数,在其中创建一个监听器,用来实时监听并处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//在监听器的方法中处理消息。该方法返回处理消息的结果
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
System.out.println("====开始消费消息====");
System.out.println("消息列表msgs = "+msgs);
//返回处理消息成功的状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息者
consumer.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 消费消息的模式
消费者消费消息的方式有多种。可以通过setMessageModel方法来设置消费者的消费方式。
负载均衡模式(默认的模式)
同一个消费组中的消费者采用负载均衡的方式来消费消息,这种模式下每个消费者处理的消息都是不同的,并且每个消息只会被消费一次。
设置消费者的消费模式为负载均衡模式
// 创建消息生产者,指定组名,topic,tag
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer-group01");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("myTopic01", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
2
3
4
5
6
广播模式
同一个消费组中的消费者采用广播的方式消费消息,这种模式下每个消费者消费的消息都是相同的。
设置消费者的消费模式为广播模式
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
2
# 顺序消息
顺序消息通常用于业务上有先后顺序要求的场景,比如主从数据同步,订单流程操作等。这些必须按照一定的顺序执行,否则会导致错误的结果。
顺序消息的原理
普通发送消息的模式下,生产者会采用Round Robin轮询的方式将消息均匀的分发到Topic中的不同队列中,然后消费者会从Topic中的多个队列上进行消息消费。因为生产者和消费者同时对Topic中的多个队列操作,因此RocketMQ 就无法使用队列有序特性来保证消息有序性了。
但是如果控制生产者发送的消息有序发送到同一个queue中,消费者只从这个queue上依次消费消息,那么就保证了消息的顺序性。
顺序消息一般分为全局顺序消息和局部顺序消息。
- 全局顺序消息: 是指某个Topic下的所有消息都要保证顺序。如果要实现全局顺序消息,那么只能使用一个队列,以及一个生产者,这是会严重影响性能。
- 局部顺序消息:是保证Topic中的每一个队列中的消息能顺序发送和顺序消费。
由于全局顺序消息是会严重影响性能。所以大部分情况下,都是用局部顺序消息。
# 消息的顺序发送和顺序消费
通常情况下,一个Topic中有多个消息队列,生产者发送的消息轮询存储在不同队列中。
如果我们要将消息顺序发送,那么我们必须将一组相同的消息按照顺序存入同一个队列中,就能保证消息有序发送。
例如:一组订单消息,包括订单创建消息,订单支付消息,订单完成消息。这一组订单消息都包含相同的订单ID号。我们可以通过对订单ID号进行Hash取模,来保证同一组订单消息都存放在同一个队列中。
生产者代码如下
public class Producer {
private DefaultMQProducer producer;
//初始化生产者
public Producer() throws MQClientException {
producer = new DefaultMQProducer("myConsumer-group02");
producer.setNamesrvAddr("localhost:9876");
producer.start();
}
//发送消息
public void sendMessage(Order order) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
//创建消息对象,将Order对象传入其中
Message message = new Message("OrderTopic", "orderTAG", JSON.toJSONBytes(order));
//发送消息,通过消息选择器MessageQueueSelector,来选择具体的队列
producer.send(message, new MessageQueueSelector() {
//选择消息队列的select方法
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//对订单ID%队列数量。来选择队列
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, order.getId());
}
//关闭生产者
public void shutdown() {
producer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
RocketMQ提供了一个MessageQueueSelector接口来选择消息队列,我们需要实现这个接口来选择正确的队列。在上面的代码中,我们使用了order.getId()作为参数来选择消息队列。
消费者代码如下:
public class Consumer {
private DefaultMQPushConsumer consumer;
//初始化消费者
public Consumer() throws MQClientException {
//设置消费者的组名
consumer = new DefaultMQPushConsumer("myConsumer-group02");
//设置消费者连接的NameServer
consumer.setNamesrvAddr("localhost:9876");
//设置消费者订阅的topic,tag
consumer.subscribe("OrderTopic", "*");
//注册消费者的监听方法
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//打印消息
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
//返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
//关闭消费者
public void shutdown() {
consumer.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
RocketMQ提供了一个MessageListenerOrderly接口来保证消息的顺序消费,我们只需要实现这个接口即可。
MessageListenerOrderly接口是一种高级的消费方式,可以确保一个线程只消费一个队列中的消息,并且消息按照发送的顺序被消费。
测试代码如下
public static void main(String[] args) {
//创建多个订单
Order order1 = new Order(1, "order1");
Order order2 = new Order(2, "order2");
Order order3 = new Order(3, "order3");
//创建生产者
Producer producer = new Producer();
//生产者发送消息
producer.sendMessage(order1);
producer.sendMessage(order2);
producer.sendMessage(order3);
//暂停
Thread.sleep(1000);
//创建消费者
Consumer consumer = new Consumer();
//暂停
Thread.sleep(5000);
//关闭生产者和消费者
producer.shutdown();
consumer.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
因为发送和消费是异步的,所以我们需要等待一段时间,以便消息被正确地发送和消费。
# 总结
消息顺序发送和顺序消费的条件:
- 发送消息:保证同一类消息能都有序发送到同一条队列中。
- 消费消息:保证同一个消费者,只消息这一个队列中的消息。
# 定时/延时消息
定时/延时消息:是指消息不会马上发送到消息队列中,而是到达某个时间之后才发送给消息队列,让消费者消费。
例如,订单下单后用户可能不会立即支付。通常需要30分钟后,服务端检查订单状态后,才决定是否取消订单。
从程序的角度,就是生产者创建一个订单取消的延时消息。30分钟后若订单支付了,则删除这个消息。若订单没有支付,则将这个消息发送到消息队列,让消费者消费。即让消费者将这个订单取消。
定时/延时消息的原理:即消息发送之后,不会直接存入到队列中,而是临时存入到一个位置,这个位置会被实时监听。当时间到了,就把消息从这个位置再次存入到队列中。
代码如下
Message message = new Message("myTopic01","Tag01",("Hello world3").getBytes());
// 延时消息 10s 后投递
message.setDelayTimeSec(10);
// 延时消息 10000ms 后投递
message.setDelayTimeMs(10_000L);
// 延时消息,定时时间为当前时间 + 10000ms
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// 直接发送消息即可
SendResult result = producer.send(message);
2
3
4
5
6
7
8
9
除此之外,还有更多的方法提供了延时和定时消息的方式。自行查询源码。
# 事务消息
RocketMQ的事务消息是用来保证消息与事务的一致性。
# 什么是消息与事务的一致性?
假如一个本地事务A,这个本地事务A中要完成一些业务逻辑操作(向数据库插入几条记录,向消息队列发送消息)。
当本地事务A需要回滚的时候,我们希望本地事务A中的写库操作和发送消息操作也能够回滚。那么我们该如何做呢?
方式1:先写库,后发消息。
//伪代码
public void test(){
//写库操作,向表中插入几条数据
//注意,暂时不要进行数据库事务提交
orderService.insert("xxx01");
orderService.insert("xxx02");
orderService.insert("xxx03");
//后发送消息
SendResult sendResult = producer.send("xxxx");
//判断消息是否发送成功
if(sendResult.getState().equals("OK")){
//进行数据库事务提交
connection.commit()
}else{
//进行数据库事务回滚
connection.commit()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
方式1的执行过程如下:
- 先写库,暂时不要提交数据库事务。
- 后发送消息。
- 若消息发送成功,则提交数据库事务。否则回滚数据库事务。
方式1有一个问题。即生产者发送消息之后,需要等待消息发送结果之后,才提交/回滚数据库事务。
如果由于网络问题,导致需要等待一段时间后,生产者才能接收到消息发送结果。那么这一段时间内,数据库事务需要一直等待。这样会导致数据库连接资源很容易就耗尽。最终导致其他业务无法进行数据库操作。
方式2:先发消息,后写库。
//伪代码
public void test(){
//先发送消息
SendResult sendResult = producer.send("xxxx");
//判断消息是否发送成功
if(sendResult.getState().equals("OK")){
//写库操作,向表中插入几条数据
orderService.insert("xxx01");
orderService.insert("xxx02");
orderService.insert("xxx03");
//进行数据库事务提交
connection.commit()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
方式2的执行过程如下:
- 先发送消息
- 若消息发送成功,则进行写库。否则,不写库。
方式2有一个问题。如果消息发送成功后,写库操作失败了。那么我们如何撤回发送成功的消息。
方式3: RocketMQ提供了事务消息。
事务消息可以让本地事务和消息保持一致性,即一个业务功能中的本地事务操作和消息发送操作,要么全都完成,要么全都不完成。
# 事务消息的实现流程
再次说明,事务消息是让本地事务和消息保持一致性。即一个业务功能中的本地事务操作和消息发送操作,要么全都完成,要么全都不完成。
如图所示
事务消息实现流程如下:
- 上游服务通过生产者,向MQ发送一条half消息。MQ收到half消息后,会返回一个响应给上游服务。
- 上游服务的生产者接收MQ发送的响应。
- 如果生产者接收到SUCCESS响应,表明half消息发送成功。此时上游服务开始执行本地事务。
- 如果生产者接收到FAIL响应,则尝试重发half消息。此时上游服务不会执行本地事务。
- 如果上游服务的本地事务提交成功/失败,上游服务通过生产者向MQ发送commit/rollback事务状态消息。
- MQ会接收到生产者发送的commit/rollback事务状态消息。
- 如果接收到commit,则MQ会把half消息提交(即half消息变为真正的消息,并持久化到磁盘上)。
- 如果接收到rollback,则MQ会把half消息删除。
- 如果由于网络问题,导致MQ长时间未接收到针对这个half消息的commit/rollback。那么MQ会调用上游服务提供的一个接口,通过这个接口来判断本地事务是否完成以及判断half消息的状态。
- 如果接口中判断本地事务完成,则发送commit
- 如果接口中判断本地事务失败,则发送rollback
当本地事务提交完成,并且消息也发送成功后(即half消息变为真正的消息)。则下游服务可以通过消费者来对这个消息进行消费。
什么是half消息?
half其实也是一个消息。只不过half消息在 RocketMQ 中不会被消费者消费,除非这个half消息被commit了。
half消息在被commit之前,MQ会先放在一个内部队列中。只有half消息被commit了,那么half消息会被放在消费者能读取到的 topic 队列中。
一般情况下,若本地事务提交成功,则生产者commit half消息。若本地事务提交失败,需要进行事务回滚。则生产者rollback half消息。
# 事务消息的问题
事务消息只能保证本地事务和消息的一致性。即一个业务功能中的本地事务操作和消息发送操作,要么全都完成,要么全都不完成。
对于分布式事务,RocketMQ的事务消息只能保证整个分布式事务和消息一致性的一半。即保证的是上游服务的本地事务和消息发送的一致性,而对下游服务的本地事务和消费消息的一致性无法保证。
通常情况下,可以将事务与消息分开操作。即在事务外执行发送消息的操作。通过MQ的消息补偿和重发机制,来保证消息的发送成功。
# 事务消息的代码实现
更新RocketMQ配置类。确保TransactionListenerImpl被正确注册到RocketMQTemplate中。
代码实现过程如下:
- 我们需要将手写事务监听器TransactionListener接口,并实现其中的两个方法。executeLocalTransaction方法和checkLocalTransaction方法。
- 然后将监听器实现类注册到RocketMQTemplate中。
- 然后我们先发送 half 消息。当 half 消息发送成功后,会自动触发监听器实现类的executeLocalTransaction方法,该方法用于执行本地事务。并返回事务提交状态。
- 若MQ长时间未收到本地事务提交状态,则MQ会调用监听器实现类的checkLocalTransaction方法,对本地事务进行检查。并返回事务提交状态
// RocketMQ配置类
@Configuration
public class MQConfig {
@Bean
public TransactionListener transactionListener(TransactionalService transactionalService) {
return transactionalService.new TransactionListenerImpl();
}
@Bean
public RocketMQTemplate rocketMQTemplate(RocketMQTemplate template, TransactionListener transactionListener) {
// 设置事务监听器
template.setTransactionListener(transactionListener);
return template;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
业务代码实现
@Service
public class testService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 这个方法用于执行实际的业务逻辑
@Transactional
public void runBusinessLogic(String msg) {
// 执行本地事务(比如数据库操作)
// 如果本地事务执行失败,可以抛出异常
}
// 该方法用于发送half消息
// 当half消息发送成功后,会自动触发监听器的executeLocalTransaction方法,该方法用于执行本地事务。并返回事务提交状态
// 若MQ长时间未收到事务提交状态,则MQ会调用监听器的checkLocalTransaction方法,对本地事务进行检查。并返回事务提交状态
public void sendHalfMessage(String msg) {
rocketMQTemplate.sendMessageInTransaction("test-topic", MessageBuilder.withPayload(msg).build(), null);
}
// 事务监听器,需要自己手动实现。
private class TransactionListenerImpl implements TransactionListener {
//执行本地事务方法
@Override
public LocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
try {
//执行本地事务
runBusinessLogic(msg.getPayload().toString());
//若本地事务执行成功,则返回COMMIT状态
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
//若本地事务执行异常,则返回ROLLBACK状态
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
//本地事务检测方法
@Override
public LocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
// 查询本地事务是否完成。若完成,则返回COMMIT状态。否则返回ROLLBACK状态
if("xxxx"){
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# SpringBoot整合RocketMQ
① 在springboot项目中添加RocketMQ依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
</dependencies>
2
3
4
5
6
7
② 配置RocketMQ
在配置文件中配置RocketMQ的NameServer和生产者组
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=myProducer-group01
2
③ 发送消息测试
通过RocketMQTemplate工具类,可以用来发送消息。
public class ProducerTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
//convertAndSend方法可以将参数转换为消息对象,并发送出去
public void test1(){
//指定topic和消息内容
rocketMQTemplate.convertAndSend("myTopic","hello springboot rocketmq");
}
}
2
3
4
5
6
7
8
9
10
④ 消费消息测试
创建一个类,用来作为消费者消息消息的。这个类需要添加@RocketMQMessageListener注解和实现RocketMQListener接口。
//该注解用于设置消费者所属的消费组和订阅的topic
@RocketMQMessageListener(topic = "myTopic",consumerGroup = "myConsumer-group02")
public class Consumer implements RocketMQListener<String> {
//重写接口的监听方法,用来消费消息
@Override
public void onMessage(String message) {
//消费消息的具体业务逻辑
log.info("消费了一个消息。message = "+message);
}
}
2
3
4
5
6
7
8
9
10
# RocketMQTemplate的使用
RocketMQTemplate的使用例子:
@Slf4j
@Service
public class TEST {
@Autowired
private RocketMQTemplate template;
/**
* 发送普通消息
*
* @param topic topic
* @param message 消息体
*/
public void sendMessage(String topic, Object message) {
template.convertAndSend(topic, message);
log.info("普通消息发送完成:message = {}", message);
}
/**
* 发送同步消息
*
* @param topic topic
* @param message 消息体
*/
public void syncSendMessage(String topic, Object message) {
SendResult sendResult = template.syncSend(topic, message);
log.info("同步发送消息完成:message = {}, sendResult = {}", message, sendResult);
}
/**
* 发送携带 tag 的消息(过滤消息)
*
* @param topic topic,RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行
* 拆分再组装。只要在指定 topic 时跟上 {:tags} 就可以指定tag
* 例如 test-topic:tagA
* @param message 消息体
*/
public void syncSendMessageWithTag(String topic, Object message) {
template.syncSend(topic, message);
log.info("发送带 tag 的消息完成:message = {}", message);
}
/**
* 发送异步消息
*
* @param topic topic
* @param message 消息体
*/
public void asyncSendMessage(String topic, Object message) {
template.asyncSend(topic, message, new SendCallback() {
//回调成功函数
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功,message = {}, SendStatus = {}", message, sendResult.getSendStatus());
}
//回调失败函数
@Override
public void onException(Throwable e) {
log.info("异步消息发送异常,exception = {}", e.getMessage());
}
});
}
/**
* 发送单向消息
*
* @param topic topic
* @param message 消息体
*/
public void sendOneWayMessage(String topic, Object message) {
template.sendOneWay(topic, message);
log.info("单向发送消息完成:message = {}", message);
}
/**
* 同步发送批量消息
*
* @param topic topic
* @param messageList 消息集合
* @param timeout 超时时间(毫秒)
*/
public void syncSendMessages(String topic, List<Message<?>> messageList, long timeout) {
template.syncSend(topic, messageList, timeout);
log.info("同步发送批量消息完成:message = {}", JSON.toJSONString(messageList));
}
/**
* 发送事务消息
*
* @param topic topic
* @param message 消息对象
*/
public void sendMessageInTransaction(String topic, SysUserDto message) {
//随机创建事务ID
String transactionId = UUID.randomUUID().toString();
//发送事务消息
TransactionSendResult result = template.sendMessageInTransaction(topic, MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build(), message);
log.info("发送事务消息(半消息)完成:result = {}", result);
}
/**
* 同步发送延时消息
*
* @param topic topic
* @param message 消息体
* @param timeout 超时
* @param delayLevel 延时等级:
* 从1s到2h分别对应着等级 1 到 18,消息消费失败会进入延时消息队列
* "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
public void syncSendDelay(String topic, Object message, long timeout, int delayLevel) {
template.syncSend(topic, MessageBuilder.withPayload(message).build(), timeout, delayLevel);
log.info("已同步发送延时消息 message = {}", message);
}
/**
* 异步发送延时消息
*
* @param topic topic
* @param message 消息对象
* @param timeout 超时时间
* @param delayLevel 延时等级
*/
public void asyncSendDelay(String topic, Object message, long timeout, int delayLevel) {
template.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送延时消息成功,message = {}", message);
}
@Override
public void onException(Throwable throwable) {
log.error("异步发送延时消息发生异常,exception = {}", throwable.getMessage());
}
}, timeout, delayLevel);
log.info("已异步发送延时消息 message = {}", message);
}
/**
* 发送单向顺序消息
*
* @param topic topic
*/
public void sendOneWayOrderly(String topic) {
for (int i = 0; i < 30; i++) {
template.sendOneWayOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "topic");
log.info("单向顺序发送消息完成:message = {}", "message - " + i);
}
}
/**
* 同步发送顺序消息
*
* @param topic topic
*/
public void syncSendOrderly(String topic) {
for (int i = 0; i < 30; i++) {
SendResult sendResult = template.syncSendOrderly(topic, MessageBuilder.withPayload("message - " + i).build(), "syncOrderlyKey");
log.info("同步顺序发送消息完成:message = {}, sendResult = {}", "message - " + i, sendResult);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166