一、概述
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。官网
MQ有性能瓶颈,尽量减少消息内容的大小
消息形式
点对点
一个生产者和一个消费者一一对应
默认是存在于MQ的服务器中的,发送消息之后,消费者随时取。但是一定是一个消费者取,消费完消息也就没有了。
发布/订阅模式
一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
topic 默认是不存在于MQ服务器中的,一旦发送之后,如果没有订阅,消息则丢失。
消息正文格式
- StreamMessage Java原始值的数据流
- MapMessage 一套名称-值对
- TextMessage 一个字符串对象
- ObjectMessage 一个序列化的 Java对象
- BytesMessage 一个字节的数据流
1.1 JMS应用程序接口
| 主要接口 | 作用 |
|---|---|
| ConnectionFactory 接口(连接工厂) | 创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和 TopicConnectionFactory两种。 |
| Connection 接口(连接) | Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session |
| Destination 接口(目标) | Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic |
| MessageConsumer 接口(消息消费者) | 消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。 |
| MessageProducer 接口(消息生产者) | 消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。 |
| Message 接口(消息) | 是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:<br/消息头(必须):包含用于识别和为消息寻找路由的操作设置。 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 消息接口非常灵活,并提供了许多方式来定制消息的内容。 |
| Session 接口(会话) | Session 是对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。 |
| MessageListener | 消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。 |
1.2 activeMq服务相关
命令操作
# 启动
<NolebasePageProperties />
./activemq start
# 关闭
./activemq stop
# 查看状态
./activemq status后台及日志
日志文件地址: data/activemq.log
管理后台地址:http://ip地址:8161/admin
1.3 服务配置文件
activemq.xml
activemq.xml 是 spring 配置文件. 其中配置的是 ActiveMQ 应用使用的默认对象组件.transportConnectors 标签 - 配置链接端口信息的. 其中的端口号 61616 是 ActiveMQ 对外发布的 tcp 协议访问端口. 就是 java 代码访问 ActiveMQ 时使用的端口.
jetty.xml
spring 配置文件, 用于配置 jetty 服务器的默认对象组件.jetty 是类似 tomcat 的一个中间件容器.ActiveMQ 默认支持一个网页版的服务查看站点. 可以实现 ActiveMQ 中消息相关数据的页面查看.8161 端口, 是 ActiveMQ 网页版管理站点的默认端口.在 ActiveMQ 网页版管理站点中,需要登录, 默认的用户名和密码都是 admin.
users.properties
内容信息: 用户名=密码 是用于配置客户端通过协议访问 ActiveMQ 时,使用的用户名和密码.
1.4 拓展知识
ActiveMQ宕机了怎么办?
ActiveMQ主从集群方案:Zookeeper集群+ Replicated LevelDB + ActiveMQ集群
官网链接:http://activemq.apache.org/replicated-leveldb-store
如何防止消费方消息重复消费
解决消费方幂等性问题!
如果因为网络延迟等原因,MQ无法及时接收到消费方的应答,导致MQ重试。在重试过程中造成重复消费的问题。
解决思路:
1)如果消费方是做数据库操作,那么可以把消息的ID作为表的唯一主键,这样在重试的情况下,会触发主键冲突,从而避免数据出现脏数据。
2)如果消费方不是做数据库操作,那么可以借助第三方的应用,例如Redis,来记录消费记录。每次消息被消费完成时候,把当前消息的ID作为key存入redis,每次消费前,先到redis查询有没有该消息的消费记录。
如何防止消息丢失?
1)在消息生产者和消费者使用事务
2)在消费方采用手动消息确认(ACK)
3)消息持久化,例如JDBC或日志
二、集成Demo
2.1 与springboot整合
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>配置
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false代码demo
@Service
public class MessageServiceActivemqImpl implements MessageService {
@Autowired
private JmsMessagingTemplate messagingTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列,id:"+id);
messagingTemplate.convertAndSend("order.queue.id",id);
}
@Override
public String doMessage() {
// 接收消息需要先将消息接收到,然后再转换成指定的数据类型
String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
System.out.println("已完成短信发送业务,id:"+id);
return id;
}
}监听消息
@Component
public class MessageListener {
@JmsListener(destination = "order.queue.id")
@SendTo("order.other.queue.id") // 继续向下传递当前消息到另一个队列中
public String receive(String id){
System.out.println("已完成短信发送业务,id:"+id);
return "new:"+id;
}
/**
* 用于接收消息的方法 * destination: 队列的名称或主题的名称
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}2.1 与spring整合
依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.204.130:61616"></property>
</bean>
<!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnection"></property>
</bean>
<!--2.创建缓存连接工厂-->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!--注入连接工厂-->
<property name="targetConnectionFactory" ref="connetionFactory"/>
<!--缓存消息数据-->
<property name="sessionCacheSize" value="5"/>
</bean>
<!-- 接收和发送消息时使用的类 -->
<bean class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂或连接工厂-->
<property name="connectionFactory" ref="connectionFactory"></property>
<!--指定是否为发布订阅模式-->
<property name="pubSubDomain" value="false"/>
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="mytopic"></constructor-arg>
</bean>
<!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="itemChangeTopic"></constructor-arg>
</bean> -->
<bean id="myMessageListener" class="com.itheima.activemq.spring.MyMessageListener"></bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="topicDestination"></property>
<property name="messageListener" ref="myMessageListener"></property>
</bean>
</beans>代码demo
// 发送消息
public void send() throws Exception{
//1.初始化spring容器
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
//2.获取到jmstemplate的对象
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//3.获取destination
Destination destination = (Destination) context.getBean(Destination.class);
//4.发送消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello activeMQ");
}
});
Thread.sleep(100000);
}
// 接收消息
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}三、使用demo
使用前需先添加依赖
3.1 Queue(点对点)
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P的特点:
l 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
l 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
l 接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式
生产者
public void testQueueProducer() throws Exception {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
//brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.204.130:61616");
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
//参数:队列的名称。
Queue queue = session.createQueue("queue-test");
// 第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
// 第七步:创建一个Message对象,创建一个TextMessage对象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer对象发送消息。
producer.send(textMessage);
// 第九步:关闭资源。
producer.close();
session.close();
connection.close();
}消费者
public void recieve() throws Exception{
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.204.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
Queue queue = session.createQueue("queue-test");
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
//第一种
// while(true){
// //接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
// Message message = consumer.receive(10000);
// //如果message为空,没有接收到消息了就跳出
// if(message==null){
// break;
// }
//
// if(message instanceof TextMessage){
// TextMessage messaget = (TextMessage)message;
// System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容
// }
// }
System.out.println("start");
//第二种:
//设置监听器,其实开启了一个新的线程。
consumer.setMessageListener(new MessageListener() {
//接收消息,如果有消息才进入,如果没有消息就不会进入此方法
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage messaget = (TextMessage)message;
try {
//获取消息内容
System.out.println(">>>获取的消息内容:"+messaget.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("end");
Thread.sleep(10000);//睡眠10秒钟。
// 9.关闭资源
consumer.close();
session.close();
connection.close();
}3.2 发布订阅
Pub/Sub模式包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。
多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特点
l 每个消息可以有多个消费者
l 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅之后,才能消费发布者的消息
l 为了消费消息,订阅者必须保持运行的状态
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
生产者
public void send() throws Exception{
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.204.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是topic)
Topic topic = session.createTopic("topic-test");//---------------------
// 6.创建生产者
MessageProducer producer = session.createProducer(topic);
// 7.构建消息对象,(构建发送消息的内容) 字符串类型的消息格式(TEXTMessage)
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("发送消息123");// 消息的内容
// 8.发送消息
producer.send(textMessage);
// 9.关闭资源
producer.close();
session.close();
connection.close();
}消费者
public void reieve() throws Exception{
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.204.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
Topic topic = session.createTopic("topic-test");//---------------------
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7.接收消息
while(true){
//接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
Message message = consumer.receive(100000);
//如果message为空,没有接收到消息了就跳出
if(message==null){
break;
}
if(message instanceof TextMessage){
TextMessage messaget = (TextMessage)message;
System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容
}
}
// 第二种:
// 设置监听器,其实开启了一个新的线程。
// consumer.setMessageListener(new MessageListener() {
// // 接收消息,如果有消息才进入,如果没有消息就不会进入此方法
// @Override
// public void onMessage(Message message) {
// if (message instanceof TextMessage) {
// TextMessage messaget = (TextMessage) message;
// try {
// // 获取消息内容
// System.out.println(">>>获取的消息内容:" + messaget.getText());
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// }
// });
//Thread.sleep(10000);// 睡眠10秒钟。
// 9.关闭资源
consumer.close();
session.close();
connection.close();
}四、安全认证
ActiveMQ 也提供了安全认证。就是用户名密码登录规则。使用需在配置文件中开启
<!-- conf/activemq.xml broker 标签 -->
<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<!-- 添加 jaas 认证插件 activemq 在 login.config 里面定义,详细见 login.config-->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism --> <authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>开启认证后,认证使用的用户信息由其他配置文件提供。
#conf/login.config
# user 代表用户信息配置文件,group 代表用户组信息配置文件。寻址路径为相对当前配 置文件所在位置开始寻址。
activemq {
org.apache.activemq.jaas.PropertiesLoginModule required org.apache.activemq.jaas.properties.user="users.properties" org.apache.activemq.jaas.properties.group="groups.properties";
};
# conf/users.properties
#用户名=密码
admin=admin
# conf/groups.properties
#用户组名=用户名,用户名
admins=admin五、持久化
持久化是指对消息数据的持久化。在 ActiveMQ 中,默认的消息是保存 在内存中的。当内存容量不足的时候,或 ActiveMQ 正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。
ActiveMQ 的默认存储策略是 kahadb。如果使用 JDBC 作为持久化策略,则会将所有的 需要持久化的消息保存到数据库中。 所有的持久化配置都在 conf/activemq.xml 中配置,配置信息都在 broker 标签内部定义。
5.1 kahadb 方式
ActiveMQ 默认的持久化策略。kahadb 是一个文件型数据库。是使用内存+文件保证 数据的持久化的。kahadb 可以限制每个数据文件的大小。不代表总计数据容量。
<persistenceAdapter>
<!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter>**特性是:**1、日志形式存储消息;2、消息索引以 B-Tree 结构存储,可以快速更新;3、 完全支持 JMS 事务;4、支持多种恢复机制;
5.2 AMQ方式
只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。
<persistenceAdapter>
<!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 --> <amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/> </persistenceAdapter>性能高于 JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。 为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的 大小都是有限制的(默认 32m,可自行配置)。
当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这 个文件或者归档。
主要的缺点是 AMQ Message 会为每一个 Destination 创建一个索引,如果使用了大量的 Queue,索引文件的大小会占用很多磁盘空间。 而且由于索引巨大,一旦 Broker(ActiveMQ 应用实例)崩溃,重建索引的速度会非常 慢。
虽然 AMQ 性能略高于 Kaha DB 方式,但是由于其重建索引时间过长,而且索引文件 占用磁盘空间过大,所以已经不推荐使用
5.4 JDBC持久化方式
只有在消息必须保证有效,且绝对不能丢失的时候。使用 JDBC 存储策略。
如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用 levelDB 或 Kahadb。
application.yml
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false:点对点队列模式, true:发布/订阅模式
template:
delivery-mode: persistent # 持久化ActiveMQ 将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库 中。 以 MySQL 数据库为例。
修改activemq.xml配置文件,
<broker brokerName="test-broker" persistent="true"xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<!-- dataSource 指定持久化数据库的 bean
createTablesOnStartup 是否在启动的时候创建数据表,默认值是 true
一般是第一次启动的时候设置为 true,之后改成 false。
-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/>
<property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/>
</bean>activemq_msgs 用于存储消息,Queue 和 Topic 都存储在这个表中:
| 字段名 | 含义 |
|---|---|
| ID | 自增的数据库主键 |
| CONTAINER | 消息的 Destination |
| MSGID_PROD | 消息发送者客户端的主键 |
| MSG_SEQ | 是发送消息的顺序,MSGID_PROD+MSG_SEQ 可以组成 JMS 的 MessageID |
| EXPIRATION | 消息的过期时间,存储的是从 1970-01-01 到现在的毫秒数 |
| MSG | 消息本体的 Java 序列化对象的二进制数据 |
| PRIORITY | 优先级,从 0-9,数值越大优先级越高 |
activemq_acks 用于存储订阅关系。如果是持久化 Topic,订阅者和服务器的订阅关系在 这个表保存:
| 字段名 | 含义 |
|---|---|
| CONTAINER | 消息的 Destination |
| SUB_DEST | 如果是使用 Static 集群,这个字段会有集群其他系统的信息 |
| CLIENT_ID | 每个订阅者都必须有一个唯一的客户端 ID 用以区分 |
| SUB_NAME | 订阅者名称 |
| SELECTOR | 选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现, 可支持多属性 AND 和 OR 操作 |
| LAST_ACKED_ID | 记录消费过的消息的 ID。表 activemq_lock 在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。 这个表用于记录哪个 Broker 是当前的 Master Broker |
六、消息失败重发
6.1 消息事务
消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。
一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。生产者、消费者与消息服务器直接都支持事务性;
ActionMQ的事务主要偏向在生产者的应用。
发送时开启事务
/** * 事务性发送--方案一 */
@Test
public void sendMessageTx(){
//获取连接工厂
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
Session session = null;
try {
//创建连接
Connection connection = connectionFactory.createConnection();
/*** 参数一:是否开启消息事务 */
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建生产者
MessageProducer producer = session.createProducer(session.createQueue(name));
for(int i=1;i<=10;i++){
//模拟异常
if(i==4){ int a = 10/0; }
TextMessage textMessage = session.createTextMessage("消息--" + i);
producer.send(textMessage);
}
//注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达 MQ服务器
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事务回滚
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}配置开启事务
配置类:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/****/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}生产者:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/*** 消息发送的业务类 */
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional
// 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
public void sendMessage(){
for(int i=1;i<=10;i++) {
//模拟异常
if(i==4){ int a = 10/0; }
jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
}
}
}消费者:
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/*** 消息消费者 */
@Component
public class Consumer {
/*** 接收消息的方法 */
@JmsListener(destination="${activemq.name}",containerFactory = "jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage,Session session) throws JMSException {
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重发:" + textMessage.getJMSRedelivered());
int i = 100/0;
//模拟异常
session.commit();
//提交事务
} catch (JMSException e) {
try {
session.rollback();
//回滚事务
} catch (JMSException e1) {
e.printStackTrace();
}
}
}6.2 消息确认机制
在Session接口中定义的几个常量:
- AUTO_ACKNOWLEDGE = 1 自动确认
- CLIENT_ACKNOWLEDGE = 2 客户端手动确认
- DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
- SESSION_TRANSACTED = 0 事务提交并确认
springboot配置确认方式
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/****/
@Configuration
public class ActiveMqConfig {
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(false);
// 不开启事务操作
factory.setSessionAcknowledgeMode(1);
//自动确认
return factory;
}
}6.3 消息重发实现
消息消费端在创建Session对象时需要指定应答模式为客户端手动应答,当消费者获取到消息并成功处理后需要调用message.acknowledge()方法进行应答,通知Broker消费成功。
如果处理过程中出现异常,需要调用session.recover()通知Broker重复消息,默认最多重复6次
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class ActiveMQTest {
// 编写消息的发送方----生产者
@Test
public void test1() throws Exception {
// 创建链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 从工厂中获取一个链接对象
Connection connection = connectionFactory.createConnection();
// 连接MQ服务
connection.start();
// 获得session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 通过session对象创建Topic
Topic topic = session.createTopic("itheimaTopic");
// 通过session对象创建消息的发送者
MessageProducer producer = session.createProducer(topic);
// 通过session创建消息对象
TextMessage message = session.createTextMessage("ping");
// 发送消息
producer.send(message);
// 关闭相关资源
producer.close();
session.close();
connection.close();
}
// 编写消息的接收方----消费者
@Test
public void test2() throws Exception {
// 创建链接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 从工厂中获取一个链接对象
Connection connection = connectionFactory.createConnection();
// 连接MQ服务
connection.start();
// 获得session对象
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 通过session对象创建Topic
Topic topic = session.createTopic("itheimaTopic");
// 通过session对象创建消息的消费者
MessageConsumer consumer = session.createConsumer(topic);
//指定消息监听器
consumer.setMessageListener(new MessageListener() {
//当监听的topic中存在消息,这个方法自动执行
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
if(textMessage.getText().equals("ping")){
System.out.println("消费者接收到了消息:" + textMessage.getText());
//客户端手动应答
message.acknowledge();
}else{
System.out.println("消息处理失败了。。。");
//通知MQ进行消息重复,最多可以重发6次
session.recover();
//模拟消息处理失败
int i = 1/0;
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
while(true){
}
}
}七、JMS消息组成
7.1 协议结构
整个JMS协议组成结构如下:
| 结构 | 描述 |
|---|---|
| JMS Provider | 消息中间件/消息服务器 |
| JMS Producer | 消息生产者 |
| JMS Consumer | 消息消费者 |
| JMS Message | 消息,由 消息头、消息体、消息属性三部分组成 |
7.2 消息头
JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:
| 名称 | 描述 |
|---|---|
| JMSDestination | 消息发送的 Destination,在发送过程中由提供者设置 |
| JMSMessageID | 唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设 置的,客户机只能在消息发送后才能确定消息的 JMSMessageID |
| JMSDeliveryMode | 消息持久化。包含值 DeliveryMode.PERSISTENT 或者 DeliveryMode.NON_PERSISTENT。 |
| JMSTimestamp | 提供者发送消息的时间,由提供者在发送过程中设置 |
| JMSExpiration | 消息失效的时间,毫秒,值 0 表明消息不会过期,默认值为0 |
| JMSPriority | 消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证先级 的优先级最高。 为普通消息, 为加急消息。 Q不保证 优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4 |
| JMSCorrelationID | 通常用来链接响应消息与请求消息,由发送消息的 JMS 程序设置。 |
| JMSReplyTo | 请求程序用它来指出回复消息应发送的地方,由发送消息的 JMS 程序设置 |
| JMSType | JMS 程序用它来指出消息的类型。 |
| JMSRedelivered | 消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息 |
不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用以上setJMSXXX()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType
7.3 消息体
在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage--一个字符串对象 *
· MapMessage--一套名称-值对
· ObjectMessage--一个序列化的 Java 对象 *
· BytesMessage--一个字节的数据流 *
· StreamMessage -- Java原始值的数据流TextMessage
写出:
/** * 发送TextMessage消息 */
@Test
public void testMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("文本消息");
return textMessage;
}
});
}读取:
/** * 接收TextMessage的方法 */
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}MapMessage
发送:
/** * 发送MapMessage消息 */
@Test
public void mapMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","张三");
mapMessage.setInt("age",20);
return mapMessage;
}
});
}接收:
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("名称:"+mapMessage.getString("name"));
System.out.println("年龄:"+mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}ObjectMessage
//发送ObjectMessage消息
@Test public void test2(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User();
user.setName("小苍");
user.setAge(18);
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}接收:
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}注意:ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
packages:
trust-all: true # 添加所有包到信任列表BytesMessage
写出:
//发送BytesMessage消息
@Test
public void test3(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
try {
File file = new File("d:/spring.jpg");
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[(int)file.length()];
in.read(bytes);
bytesMessage.writeBytes(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return bytesMessage;
}
});
}读取:
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage)message;
FileOutputStream out = new FileOutputStream("d:/abc.jpg");
byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
out.write(buf);
out.close();
}StreamMessage
写出:
//发送StreamMessage消息
@Test
public void test4(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("你好,ActiveMQ");
streamMessage.writeInt(20);
return streamMessage; }
});
}读取:
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
StreamMessage streamMessage = (StreamMessage)message;
String str = streamMessage.readString();
int i = streamMessage.readInt();
System.out.println(str);
System.out.println(i);
}7.4 消息属性
给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。
message.setStringProperty("Property",Property); //自定义属性八、消息投递方式
8.1 异步投递
同步发送
消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。
异步发送:
如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞 Producer.send方法。
开启方式
在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true属性
1)如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
2)当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
3)当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。
总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送!!!
配置异步投递的方式
//1.在连接上配置
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
//2. 通过 ConnectionFactory
ConnectionFactory ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
//3. 通过 connection
connection ((ActiveMQConnection)connection).setUseAsyncSend(true);注意:如果是Spring或SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递
@Configuration
public class ActiveConfig {
/*** 配置用于异步发送的非持久化JmsTemplate */
@Autowired
@Bean
public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return template;
}
/*** 配置用于同步发送的持久化JmsTemplate */
@Autowired
@Bean
public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
return template;
}
}如何确认发送成功
异步投递丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg)持续发送消息。
由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。
这时,可以给异步投递方法接收回调,以确认消息是否发送成功!
/** * 异步投递,回调函数 * @return */
@RequestMapping("/send")
public String sendQueue(){
Connection connection = null;
Session session = null;
ActiveMQMessageProducer producer = null;
// 获取连接工厂
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
try {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name);
int count = 10;
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
//创建需要发送的消息
TextMessage textMessage = session.createTextMessage("Hello");
//设置消息唯一ID
String msgid = UUID.randomUUID().toString();
textMessage.setJMSMessageID(msgid);
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
// 使用msgid标识来进行消息发送成功的处理
System.out.println(msgid+" 消息发送成功");
}
@Override
public void onException(JMSException exception) {
// 使用msgid表示进行消息发送失败的处理
System.out.println(msgid+" 消息发送失败");
exception.printStackTrace();
}
});
}
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
return "ok";
}8.2 延迟投递
生产者提供两个发送消息的方法,一个是即时发送消息,一个是延时发送消息。
1、修改activemq.xml 添加schedulerSupport="true"配置
<broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" >
......
</broker>2、在代码中设置延迟时长
/** * 延时投递 ** @return */
@Test
public String sendQueue() {
Connection connection = null;
Session session = null;
ActiveMQMessageProducer producer = null;
// 获取连接工厂
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
try {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name);
int count = 10;
producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//创建需要发送的消息
TextMessage textMessage = session.createTextMessage("Hello");
//设置延时时长(延时10秒)
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
producer.send(textMessage);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
return "ok";
}8.3 定时投递
需要借助定时任务框架实现
九、死信队列
DLQ-Dead Letter Queue,死信队列,用来保存处理失败或者过期的消息
出现以下情况时,消息会被重发:
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
当一个消息被重发超过6(缺省为6次)次数时,会给broker发送一个"Poison ack",这个消息被认为是a poison pill,这时broker会将这个消息发送到死信队列,以便后续处理。
注意两点:
1)缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ
2)缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。
可以通过配置文件(activemq.xml)来调整死信发送策略。为每个队列建立独立的死信队列
修改activemq.xml
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>RedeliveryPolicy重发策略设置
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
* 配置类
*/
@Configuration
public class ActiveMQConfig {
//RedeliveryPolicy重发策略设置
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(10);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(2);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${spring.activemq.broker-url}")String url, RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"admin",
"admin",
url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,PlatformTransactionManager transactionManager){
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory ();
factory.setTransactionManager(transactionManager);
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true); // 开启事务
factory.setSessionAcknowledgeMode(1);
return factory;
}
}十、安装
10.1 zookeeper+activemq集群实现高可用
安装zookeeper
安装acticemq(加压压缩包即可)
集群配置
第一步:在/usr/local/zookeeper/data下创建myid文件,文件内容为1。同理,其他虚拟机中也创建myid文件,内容分别为2和3
第二步:修改/usr/local/zookeeper/conf/zoo.cfg文件,加入以下内容
dataDir=/usr/local/zookeeper/data/ dataLogDir=/usr/local/zookeeper/logs server.1=192.168.112.128:2888:3888 server.2=192.168.112.129:2888:3888 server.3=192.168.112.130:2888:3888第三步:分别启动三台zookeeper(需要关闭防火墙)
service iptables stop/usr/local/zookeeper/bin/zkServer.sh start第四步:修改/usr/local/activemq/conf目录下的activemq.xml
vim /usr/local/activemq/conf/activemq.xml修改
brokerName=”activemq-cluster”将文件中持久化适配器改为
xml<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="192.168.112.128:2181,192.168.112.129:2181,192.168.112.130:2181" hostname="192.168.112.128" zkPath="/activemq/leveldb-stores"/> </persistenceAdapter>注意:其他虚拟机中修改时hostname需要改为对应的ip
第五步:启动mq服务
./activemq start
页面控制台: http://ip:8161 (监控)
列表各列信息含义如下:
Number Of Pending Messages :等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers :消费者 这个是消费者端的消费者数量
Messages Enqueued :进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued :出了队列的消息 可以理解为是消费这消费掉的数量。