一、概述
Apache RocketMQ是一个采用Java语言开发的分布式的消息系统,由阿里巴巴团队开发,与2016年底贡献给Apache,成为了Apache的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了 集 团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转(在 2017 年的双十一当天,整个阿里巴巴集团通过 RocketMQ 流转的线上消息达到了 万亿级,峰值 TPS 达到 5600 万),在阿里大中台策略上发挥着举足轻重的作用 。
地址:http://rocketmq.apache.org/
1.1 核心概念

- Producer
- 消息生产者,负责产生消息,一般由业务系统负责产生消息。
- Producer Group 一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
- Consumer
- 消息费者,负责消费消息,一般是后台系统负责异步消费。
- Push Consumer 服务端向消费者端推送消息
- Pull Consumer 消费者端向服务定时拉取消息
- Consumer Group 一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
- NameServer
- 集群架构中的组织协调员
- 收集broker的工作情况
- 不负责消息的处理
- Broker
- 是RocketMQ的核心负责消息的发送、接收、高可用等(真正干活的)
- 需要定时发送自身情况到NameServer,默认10秒发送一次,超时2分钟会认为该broker失效。
- Topic
- 不同类型的消息以不同的Topic名称进行区分,如User、Order等 是逻辑概念
- Message Queue 消息队列,用于存储消息
1.2 Message数据结构
| 字段名 | 默认值 | 说明 |
|---|---|---|
| Topic | null | 必填,线下环境不需要申请,线上环境需要申请后才能使用 |
| Body | null | 必填,二进制形式,序列化由应用决定,Producer 与 Consumer 要协商好序列化形式。 |
| Tags | null | 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念 |
| Keys | null | 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。 |
| Flag | 0 | 选填,完全由应用来设置,RocketMQ 不做干预 |
| DelayTimeLevel | 0 | 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 |
| WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 |
1.3 mq三种消息对比
| 发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
|---|---|---|---|
| 同步发送 | 快 | 有 | 不丢失 |
| 异步发送 | 快 | 有 | 不丢失 |
| 单向发送 | 最快 | 无 | 可能丢失 |
1.4 消费者两种模式
在RocketMQ中,消费者有两种模式,一种是push模式,另一种是pull模式。
push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
pull模式:客户端不断的轮询请求服务端,来获取新的消息。
但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。
区别
Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
RocketMQ采用pull方式实现
1.5 消息实时性
RocketMQ中采用了长轮询的方式实现消息的实时性
长轮询即是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的数据,再返回,然后进入循环周期。
客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求。
1.6 死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
信息特征
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
队列特征
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。死信消息可在控制台查看。
1.7 MQ应用场景
- 解耦合:将几个业务关联的微服务调用修改为基于MQ的异步通知,可以解除微服务之间的业务耦合。同时还提高了业务性能。
- 流量削峰:将突发的业务请求放入MQ中,作为缓冲区。后端的业务根据自己的处理能力从MQ中获取消息,逐个处理任务。流量曲线变的平滑很多
- 延迟队列:基于RabbitMQ的死信队列或者DelayExchange插件,可以实现消息发送后,延迟接收的效果。
二、使用示例
2.1 与springboot整合
依赖
<!--
rocketmq-spring-boot-starter的依赖包是不能直接从中央仓库下载的,需要自己通过源码install到本地仓库的。
https://github.com/apache/rocketmq-spring
-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency配置
# Spring boot application
<NolebasePageProperties />
spring.application.name = itcast-rocketmq
# 多个使用;号隔开
spring.rocketmq.nameServer=172.16.55.185:9876
spring.rocketmq.producer.group=my-group生产者
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {
// 注入rocketMQ的模板
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
* @param topic
* @param msg
*/
public void sendMsg(String topic, String msg) {
// 普通消息
this.rocketMQTemplate.convertAndSend(topic, msg);
// 事务消息
Message message = MessageBuilder.withPayload(msg).build();
// myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致
this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup",
topic,
msg,
null);
System.out.println("发送消息成功");
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("消息发送失败!!!!!");
}
};
rocketMQTemplate.asyncSend("order_id",id,callback);
}
}消费者
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "spring-my-topic",
consumerGroup = "spring-consumer-group",
selectorExpression = "*",
consumeMode = ConsumeMode.CONCURRENTLY
)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -> " + msg);
}
}事务消息
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;
@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行业务逻辑
*
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
try {
System.out.println("执行操作1");
Thread.sleep(500);
System.out.println("执行操作2");
Thread.sleep(800);
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.UNKNOWN;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* 回查
*
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));
return STATE_MAP.get(transId);
}
}2.2 原生客户端操作
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>创建topic
DefaultMQProducer producer = new DefaultMQProducer("haoke");
//设置nameserver的地址
producer.setNamesrvAddr("172.16.55.185:9876");
// 启动生产者
producer.start();
/**
* 创建topic,参数分别是:broker的名称,topic的名称,queue的数量
*
*/
producer.createTopic("broker_haoke_im", "my-topic", 8);
System.out.println("topic创建成功!");
producer.shutdown();发送同步消息
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("haoke");
// 设置NameServer的地址
producer.setNamesrvAddr("172.16.55.185:9876");
// 启动Producer实例
producer.start();
//发送消息
String msg = "我的第一个消息6!";
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("my-topic", "delete", msg.getBytes("UTF-8"));
// 发送消息到一个Broker,并返回sendResult结果
SendResult sendResult = producer.send(message);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
System.out.println(sendResult);
// 关闭Producer实例。
producer.shutdown();
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法 //参数二: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");
System.out.println(sendResult);发送异步消息
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("haoke");
// 设置NameServer的地址
producer.setNamesrvAddr("172.16.55.185:9876");
// 启动Producer实例
producer.start();
// 发送消息
String msg = "我的第一个异步发送消息!";
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("my-topic", msg.getBytes("UTF-8"));
// SendCallback接收异步返回结果的回调
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功了!" + sendResult);
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息队列:" + sendResult.getMessageQueue());
System.out.println("消息offset值:" + sendResult.getQueueOffset());
}
public void onException(Throwable e) {
System.out.println("消息发送失败!" + e);
}
});
// producer.shutdown();
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});发送单向消息
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("haoke");
// 设置NameServer的地址
producer.setNamesrvAddr("172.16.55.185:9876");
// 启动Producer实例
producer.start();
// 发送消息
String msg = "我的第一个单向发送消息!";
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("my-topic", msg.getBytes("UTF-8"));
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
// producer.shutdown();
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("haoke-consumer");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅消息,接收的是所有消息
// consumer.subscribe("my-topic", "*");
//完整匹配
//consumer.subscribe("haoke_im_topic", "SEND_MSG");
//或匹配
//consumer.subscribe("haoke_im_topic", "SEND_MSG || SEND_MSG1");
// 自定义属性进行过滤,过滤表达式类似于SQL的where,如:a> 5 AND b ='abc' 默认不支持 需在borker配置文件中添加 enablePropertyFilter=true
// consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));
consumer.subscribe("my-topic", "add || update");
// MessageModel.BROADCASTING 广播模式消费 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
// MessageModel.CLUSTERING 负载均衡模式消费 多个消费者共同消费队列消息,每个消费者处理的消息不同
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("接收到消息 -> " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();三、核心知识
3.1 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
- 顺序发送消息
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
//第三个参数用于队列的选择
rocketMQTemplate.syncSendOrderly("test-topic-1", "这是一条异步顺序消息", "xxxx");- 顺序消费消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");3.2 延时消息
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
// org/apache/rocketmq/store/config/MessageStoreConfig.java
// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();3.3 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
小于4MB
javaString topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //处理error }大于时
java// 消息切割 public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // 增加日志的开销20字节 if (tmpSize > SIZE_LIMIT) { //单个消息超过了最大的限制 //忽略,否则会阻塞分裂的进程 if (nextIndex - currIndex == 0) { //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } //把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //处理error } }
3.4 过滤消息
消费者可根据tag和sql表达式进行消息筛选。
根据标签过滤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");sql基本语法
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
消费者过滤
public void subscribe(finalString topic, final MessageSelector messageSelector)生产者设置消息属性
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();消费者根据sql筛选
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();3.5 分布式事务消息
分布式事务分类有这几种:
- 基于单个JVM,数据库分库分表了(跨多个数据库)。
- 基于多JVM,服务拆分了(不跨数据库)。
- 基于多JVM,服务拆分了 并且数据库分库分表了。
解决分布式事务问题的方案有很多,使用消息实现只是其中的一种。

原理
Half(Prepare) Message:
指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
Message Status Check:
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
实现流程
- 发送方向 MQ 服务端发送消息。
- MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
代码实现
本地事务处理
当发送半消息成功时,我们使用
executeLocalTransaction方法来执行本地事务。checkLocalTranscation方法用于检查本地事务状态,并回应消息队列的检查请求。javaimport org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.HashMap; import java.util.Map; // 返回commit状态时,消费者能够接收到消息,返回rollback状态时,消费者接受不到消息。 public class TransactionListenerImpl implements TransactionListener { private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>(); /** * 执行具体的业务逻辑 * * @param msg 发送的消息对象 * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { System.out.println("用户A账户减500元."); Thread.sleep(500); //模拟调用服务 // System.out.println(1/0); System.out.println("用户B账户加500元."); Thread.sleep(800); STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE); // 二次提交确认 // return LocalTransactionState.UNKNOW; return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { e.printStackTrace(); } STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE); // 回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } /** * 消息回查 * * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("状态回查 ---> " + msg.getTransactionId() +" " +STATE_MAP.get(msg.getTransactionId()) ); return STATE_MAP.get(msg.getTransactionId()); } }生产者
javaTransactionMQProducer producer = new TransactionMQProducer("transaction_producer"); producer.setNamesrvAddr("172.16.55.185:9876"); // 设置事务监听器 producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); // 发送消息 Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8")); producer.sendMessageInTransaction(message, null); Thread.sleep(999999); producer.shutdown();消费者
javaDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_CONSUMER"); consumer.setNamesrvAddr("172.16.55.185:9876"); // 订阅topic,接收此Topic下的所有消息 consumer.subscribe("pay_topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
3.6 消息模式
DefaultMQPushConsumer实现了自动保存offset值以及实现多个consumer的负载均衡。
//设置组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");通过groupname将多个consumer组合在一起,而后指定消息模式进行分配消息
- 集群模式
- 同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消息的一部分内容, 同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
- 广播模式
- 同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消息会被多次分发,被多个 Consumer消费。
// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);3.7 重复消息的解决方案
造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
消费端处理消息的业务逻辑保持幂等性
保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。 RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);订阅方收到消息时可以根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});3.8 数据持久化
数据要进行持久化存储。

流程
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
RocketMQ中的消息数据存储,采用了零拷贝技术(使用 mmap + write 方式),文件系统采用 Linux Ext4 文件系统进行存储。
消息数据的存储
在RocketMQ中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ尽可能保证顺序写入,顺序写入的效率比随机写入的效率高很多。
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,CommitLog是真正存储数据的文件,ConsumeQueue是索引文件,存储数据指向到物理文件的配置。
- 消息主体以及元数据都存储在CommitLog当中
- Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
- 每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
同步刷盘与异步刷盘
RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,分别是同步刷盘与异步刷盘。
- 同步刷盘
- 在返回写成功状态时,消息已经被写入磁盘 。
- 具体流程是:消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态 。
- 异步刷盘
- 在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大
- 当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
- broker配置文件中指定刷盘方式
- flushDiskType=ASYNC_FLUSH -- 异步
- flushDiskType=SYNC_FLUSH -- 同步
3.9 消息重试策略
不同类型消息重试
有序消息重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息
在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试需要分2种,分别是producer端重试和consumer端重试。
producer端重试
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("172.16.55.185:9876");
//消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.start();
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic", "SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,并且指定超时时间
SendResult sendResult = producer.send(msg, 1000);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
System.out.println(sendResult);
producer.shutdown();
}
}consumer端重试
消费者端的失败,分为2种情况,一个是exception,一个是timeout。
- exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
消息的状态
package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}消息的状态分为成功或者失败
如果返回的状态为失败时,broker的日志会有这样的信息
INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,接收此Topic下的所有消息
consumer.subscribe("my-test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
if(msgs.get(0).getReconsumeTimes() >= 3){
// 重试3次后,不再进行重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
}重试配置
开启重试
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}关闭重试
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}自定义消息最大重试次数
- 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
- 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
- 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);- timeout
由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止! 也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。
3.10 高可用
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
1. 消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
2. 消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。
3. 消息主从赋值
消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
同步赋值
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
3.11 负载均衡
1. Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。
2. Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely。
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式
集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。
四、拓展知识
4.1 broker访问不到的问题
broker使用的是内部地址,外部访问不了,需配置
#创建broker配置文件
vim /haoke/rmq/rmqbroker/conf/broker.conf
brokerIP1=172.16.55.185
namesrvAddr=172.16.55.185:9876
brokerName=broker_haoke_im
#启动broker,通过 -c 指定配置文件
bin/mqbroker -c /haoke/rmq/rmqbroker/conf/broker.conf4.2 如何确保消息的不丢失
RabbitMQ针对消息传递过程中可能发生问题的各个地方,给出了针对性的解决方案:
- 生产者发送消息时可能因为网络问题导致消息没有到达交换机:
- RabbitMQ提供了publisher confirm机制
- 生产者发送消息后,可以编写ConfirmCallback函数
- 消息成功到达交换机后,RabbitMQ会调用ConfirmCallback通知消息的发送者,返回ACK
- 消息如果未到达交换机,RabbitMQ也会调用ConfirmCallback通知消息的发送者,返回NACK
- 消息超时未发送成功也会抛出异常
- RabbitMQ提供了publisher confirm机制
- 消息到达交换机后,如果未能到达队列,也会导致消息丢失:
- RabbitMQ提供了publisher return机制
- 生产者可以定义ReturnCallback函数
- 消息到达交换机,未到达队列,RabbitMQ会调用ReturnCallback通知发送者,告知失败原因
- RabbitMQ提供了publisher return机制
- 消息到达队列后,MQ宕机也可能导致丢失消息:
- RabbitMQ提供了持久化功能,集群的主从备份功能
- 消息持久化,RabbitMQ会将交换机、队列、消息持久化到磁盘,宕机重启可以恢复消息
- 镜像集群,仲裁队列,都可以提供主从备份功能,主节点宕机,从节点会自动切换为主,数据依然在
- RabbitMQ提供了持久化功能,集群的主从备份功能
- 消息投递给消费者后,如果消费者处理不当,也可能导致消息丢失
- SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略:
- 消费者的确认机制:
- 消费者处理消息成功,未出现异常时,Spring返回ACK给RabbitMQ,消息才被移除
- 消费者处理消息失败,抛出异常,宕机,Spring返回NACK或者不返回结果,消息不被异常
- 消费者重试机制:
- 默认情况下,消费者处理失败时,消息会再次回到MQ队列,然后投递给其它消费者。Spring提供的消费者重试机制,则是在处理失败后不返回NACK,而是直接在消费者本地重试。多次重试都失败后,则按照消费者失败处理策略来处理消息。避免了消息频繁入队带来的额外压力。
- 消费者失败策略:
- 当消费者多次本地重试失败时,消息默认会丢弃。
- Spring提供了Republish策略,在多次重试都失败,耗尽重试次数后,将消息重新投递给指定的异常交换机,并且会携带上异常栈信息,帮助定位问题。
- 消费者的确认机制:
- SpringAMQP基于RabbitMQ提供了消费者确认机制、消费者重试机制,消费者失败处理策略:
4.3 避免消息堆积
消息堆积问题产生的原因往往是因为消息发送的速度超过了消费者消息处理的速度。因此解决方案无外乎以下三点:
- 提高消费者处理速度
- 增加更多消费者
- 增加队列消息存储上限
1)提高消费者处理速度
消费者处理速度是由业务代码决定的,所以我们能做的事情包括:
- 尽可能优化业务代码,提高业务性能
- 接收到消息后,开启线程池,并发处理多个消息
优点:成本低,改改代码即可
缺点:开启线程池会带来额外的性能开销,对于高频、低时延的任务不合适。推荐任务执行周期较长的业务。
2)增加更多消费者
一个队列绑定多个消费者,共同争抢任务,自然可以提供消息处理的速度。
优点:能用钱解决的问题都不是问题。实现简单粗暴
缺点:问题是没有钱。成本太高
3)增加队列消息存储上限
在RabbitMQ的1.8版本后,加入了新的队列模式:Lazy Queue
这种队列不会将消息保存在内存中,而是在收到消息后直接写入磁盘中,理论上没有存储上限。可以解决消息堆积问题。
优点:磁盘存储更安全;存储无上限;避免内存存储带来的Page Out问题,性能更稳定;
缺点:磁盘存储受到IO性能的限制,消息时效性不如内存模式,但影响不大。
4.4 保证消息的有序性
其实RabbitMQ是队列存储,天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证了。
因此,要保证消息的有序性,需要做的下面几点:
- 保证消息发送的有序性
- 保证一组有序的消息都发送到同一个队列
- 保证一个队列只包含一个消费者
4.5 防止MQ消息被重复消费
消息重复消费的原因多种多样,不可避免。所以只能从消费者端入手,只要能保证消息处理的幂等性就可以确保消息不被重复消费。
而幂等性的保证又有很多方案:
- 给每一条消息都添加一个唯一id,在本地记录消息表及消息状态,处理消息时基于数据库表的id唯一性做判断
- 同样是记录消息表,利用消息状态字段实现基于乐观锁的判断,保证幂等
- 基于业务本身的幂等性。比如根据id的删除、查询业务天生幂等;新增、修改等业务可以考虑基于数据库id唯一性、或者乐观锁机制确保幂等。本质与消息表方案类似。
五、安装
5.1 压缩包安装
由于mq默认是生产环境配置,内存配置较大,需手动修改
#调整默认的内存大小参数
cd bin/
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
cd bin/
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"# 解压
unzip rocketmq-all-4.3.2-bin-release.zip
cd rocketmq-all-4.3.2-bin-release
#启动nameserver
bin/mqnamesrv
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#启动broker
bin/mqbroker -n 172.16.185.55:9876 #-n 指定nameserver地址和端口 -c 指定配置文件路径
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker5.2 通过docker安装
#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver/logs:/opt/logs \
-v /haoke/rmq/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#创建broker容器
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker/logs:/opt/logs \
-v /haoke/rmq/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver rmqbroker
#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver5.3 测试
cd bin
#测试发送
tools.sh org.apache.rocketmq.example.quickstart.Producer
#测试接收
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer5.4 部署RocketMQ的管理工具
RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
docker安装方式
shell#拉取镜像 docker pull styletang/rocketmq-console-ng:1.0.0 #创建并启动容器 docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.16.55.185:9876 - Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng:1.0.0普通方式
shgit clone https://github.com/apache/rocketmq-externals cd rocketmq-console mvn clean package -Dmaven.test.skip=true注意:打包前在
rocketmq-console中配置namesrv集群地址:shrocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876启动rocketmq-console:
shjava -jar rocketmq-console-ng-1.0.0.jar启动成功后,我们就可以通过浏览器访问
http://localhost:8080进入控制台界面了
5.5 集群
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
集群模式
在RocketMQ中,集群的部署模式是比较多的,有以下几种:
单个Master
- 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master
**优点:**配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
**缺点:**单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多Master多Slave模式,异步复制
- 每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预。性能同多Master模式几乎一样。
- 缺点:Master宕机,磁盘损坏情况,会丢失少量消息。
多Master多Slave模式,同步双写
- 每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,只有主备都写成功,才向应用返回成功。
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
- 缺点:性能比异步复制模式略低,大约低10%左右。发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
搭建2m2s集群
通过docker搭建2master+2slave的集群。
集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
#创建2个master
#nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver01/logs:/opt/logs \
-v /haoke/rmq/rmqserver01/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
#nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /haoke/rmq/rmqserver02/logs:/opt/logs \
-v /haoke/rmq/rmqserver02/store:/opt/store \
foxiswho/rocketmq:server-4.3.2#创建第1个master broker
#master broker01
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker01/logs:/opt/logs \
-v /haoke/rmq/rmqbroker01/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#配置
#nameServer地址,分号分割
namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
#所属集群名字
brokerClusterName=ItcastCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker01
#0 表示 Master,>0 表示 Slave
brokerId=0
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
#创建第2个master broker
#master broker02
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker02/logs:/opt/logs \
-v /haoke/rmq/rmqbroker02/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#master broker02
namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10811
#创建第1个slave broker
#slave broker01
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker03/logs:/opt/logs \
-v /haoke/rmq/rmqbroker03/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker01
namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10711
#创建第2个slave broker
#slave broker01
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /haoke/rmq/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /haoke/rmq/rmqbroker04/logs:/opt/logs \
-v /haoke/rmq/rmqbroker04/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker02
namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=ItcastCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10611#启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04六、mqadmin管理工具
6.1 使用方式
进入RocketMQ安装位置,在bin目录下执行./mqadmin {command} {args}
6.2 命令介绍
1. Topic相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
| -h- | 打印帮助 | ||
| -n | NameServer服务地址,格式 ip:port | ||
| -p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
| -r | 可读队列数(默认为 8) | ||
| -w | 可写队列数(默认为 8) | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
| deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
| topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
| -c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
| -p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
| updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic,键 | ||
| -v | orderConf,值 | ||
| -m | method,可选get、put、delete | ||
| allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
| statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -a | 是否只打印活跃topic | ||
| -t | 指定topic |
2. 集群相关
| 名称 | 含义 | 命令选项 | 说明 |
| clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | 打印间隔,单位秒 | ||
| clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
| -s | 消息大小,单位B | ||
| -c | 探测哪个集群 | ||
| -p | 是否打印格式化日志,以|分割,默认不打印 | ||
| -h | 打印帮助 | ||
| -m | 所属机房,打印使用 | ||
| -i | 发送间隔,单位秒 | ||
| -n | NameServer 服务地址,格式 ip:port |
3. Broker相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
| -c | cluster 名称 | ||
| -k | key 值 | ||
| -v | value 值 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址为ip:port |
| -t | 请求超时时间 | ||
| -l | diff阈值,超过阈值才打印 | ||
| -o | 是否为顺序topic,一般为false | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | BrokerName,注意不同于Broker地址 | ||
| -s | 消息大小,单位B | ||
| -c | 发送次数 |
4. 消息相关
| 名称 | 含义 | 命令选项 | 说明 |
| queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
| -t | Topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
| -i | query 队列 id | ||
| -o | offset 值 | ||
| -t | topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | uniqe msg id | ||
| -g | consumerGroup | ||
| -d | clientId | ||
| -t | topic名称 | ||
| checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -a | 探测次数 | ||
| -s | 消息大小 | ||
| sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -p | body,消息体 | ||
| -k | keys | ||
| -c | tags | ||
| -b | BrokerName | ||
| -i | queueId | ||
| consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -b | BrokerName | ||
| -o | 从offset开始消费 | ||
| -i | queueId | ||
| -g | 消费者分组 | ||
| -s | 开始时间戳,格式详见-h | ||
| -d | 结束时间戳 | ||
| -c | 消费多少条消息 | ||
| printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -d | 是否打印消息体 | ||
| printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -i | queueId | ||
| -a | BrokerName | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -p | 是否打印消息 | ||
| -d | 是否打印消息体 | ||
| -f | 是否统计tag数量并打印 | ||
| resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | 消费者分组 | ||
| -t | topic名称 | ||
| -s | 重置为此时间戳对应的offset | ||
| -f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
| -c | 是否重置c++客户端offset |
5. 消费者、消费组相关
| 名称 | 含义 | 命令选项 | 说明 |
| consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
| -s | 是否打印client IP | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | consumer group | ||
| -i | clientId | ||
| -s | 是否执行jstack | ||
| getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
| -t | 查询主题 | ||
| -i | Consumer 客户端 ip | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| -s | 分组是否允许消费 | ||
| -m | 是否从最小offset开始消费 | ||
| -d | 是否是广播模式 | ||
| -q | 重试队列数量 | ||
| -r | 最大重试次数 | ||
| -i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
| -w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
| -a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
| deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -s | 源消费者组 | ||
| -d | 目标消费者组 | ||
| -t | topic名称 | ||
| -o | 暂未使用 |
6. 连接相关
| 名称 | 含义 | 命令选项 | 说明 |
| consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
| -t | 主题名称 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 |
7. NameServer相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
| -k | key | ||
| -v | value | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
| -k | key | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -k | key | ||
| -v | value |
8. 其他
| 名称 | 含义 | 命令选项 | 说明 |
| startMonitoring | 开启监控进程,监控消息误删、重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 |
6.3 注意事项
- 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
- 几乎所有命令都可以通过-h获取帮助
- 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令