消息中间件之ActiveMQ
首先先看一个例子:
完成一个功能需要三步调用, step1耗时15ms, step2耗时100ms,step3耗时300ms。
如果是同步调用 , 返回到用户的时间是415ms。
这样肯定是响应较慢的,用户体验不好。
如果在step1处理完后,已经接收到用户信息 。然后直接返回, 后面耗时长的操作在后台默默的做。
这时用户获得的体验是15ms就得到了响应。
图中ack是在step1处理完数据后, 将数据放到消息中间件中得到的回执。用来确认消息已经放到消息中间件中。
其中, MQ
有两个作用, 存储
和通知
。
将发来的消息存储下来, 通知对这个消息感兴趣的一方来取消息。
消息有两种形式, topic和queue。
queue的形式是,消息被消费完之后, 就被删除了。其他消费者获取不到这个消息。
topic就是, 消费者等待消息的到来, 消息来了所有的消费者都能收到消息, 如果消息已经过了消费者才来, 那就不会收到消息。
消息中间件应用场景
异步通信
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
过载保护
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
数据流处理
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。
常用消息队列比较
特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
生产者消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布订阅模式 | 支持 | 支持 | 支持 | 支持 |
请求回应模式 | 支持 | 支持 | 不支持 | 不支持 |
Api完备性 | 高 | 高 | 高 | 高 |
多语言支持 | 支持 | 支持 | java | 支持 |
单机吞吐量 | 万级 | 万级 | 万级 | 十万级 |
消息延迟 | 无 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息丢失 | 低 | 低 | 理论上不会丢失 | 理论上不会丢失 |
文档的完备性 | 高 | 高 | 高 | 高 |
提供快速入门 | 有 | 有 | 有 | 有 |
社区活跃度 | 高 | 高 | 有 | 高 |
商业支持 | 无 | 无 | 商业云 | 商业云 |
JMS中的一些角色
Broker
消息服务器,作为server提供消息核心服务
provider
生产者
消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
Consumer
消费者
消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:
- 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
- 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
p2p
基于点对点的消息模型
消息生产者生产消息发送到 queue 中,然后消息消费者从 queue 中取出并且消费消息。 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消 息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它 的则不能消费此消息了。 当消费者不存在时,消息会一直保存,直到有消费消费
[](https://github.com/bjmashibing/InternetArchitect/blob/master/16 一期 消息中间件/ActiveMQ/images/image-20200110192535698.png)
pub/sub
[](https://github.com/bjmashibing/InternetArchitect/blob/master/16 一期 消息中间件/ActiveMQ/images/image-20200110192613518.png)
基于订阅/发布的消息模型
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消 息。 和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。 当生产者发布消息,不管是否有消费者。都不会保存消息 一定要先有消息的消费者,后有消息的生产者。
PTP 和 PUB/SUB 简单对
1 | Topic | Queue |
---|---|---|
Publish Subscribe messaging 发布 订阅消息 | Point-to-Point 点对点 | |
有无状态 | topic 数据默认不落地,是无状态的。 | Queue 数据默认会在 mq 服 务器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存储。 |
完整性保障 | 并不保证 publisher 发布的每条数 据,Subscriber 都能接受到。 | Queue 保证每条数据都能 被 receiver 接收。消息不超时。 |
消息是否会丢失 | 一般来说 publisher 发布消息到某 一个 topic 时,只有正在监听该 topic 地址的 sub 能够接收到消息;如果没 有 sub 在监听,该 topic 就丢失了。 | Sender 发 送 消 息 到 目 标 Queue, receiver 可以异步接收这 个 Queue 上的消息。Queue 上的 消息如果暂时没有 receiver 来 取,也不会丢失。前提是消息不 超时。 |
消息发布接 收策略 | 一对多的消息发布接收策略,监 听同一个topic地址的多个sub都能收 到 publisher 发送的消息。Sub 接收完 通知 mq 服务器 | 一对一的消息发布接收策 略,一个 sender 发送的消息,只 能有一个 receiver 接收。 receiver 接收完后,通知 mq 服务器已接 收,mq 服务器对 queue 里的消 息采取删除或其他操作。 |
Queue
队列存储,常用与点对点消息模型
默认只能由唯一的一个消费者处理。一旦处理消息删除。
Topic
主题存储,用于订阅/发布消息模型
主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处 理的业务场景中可使用。
Queue/Topic都是 Destination 的子接口
ConnectionFactory
连接工厂,jms中用它创建连接
连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。
Connection
JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。
Destination
消息的目的地
目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。 点对点消息传递域的特点如下:
- 每个消息只能有一个消费者。
- 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。
发布/订阅消息传递域的特点如下:
- 每个消息可以有多个消费者。
- 生产者和消费者之间有时间上的相关性。
- 订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求 。持久订阅允许消费者消费它在未处于激活状态时发送的消息。 在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
Session
JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
消息可靠性机制
确认 JMS消息
只有在被确认之后,才认为已经被成功地消费了。
消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。
在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
- Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
- Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
- Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。
持久性
JMS 支持以下两种消息提交模式:
- PERSISTENT。指示JMS Provider持久保存消息,以保证消息不会因为JMS Provider的失败而丢失。
- NON_PERSISTENT。不要求JMS Provider持久保存消息。
优先级
可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。
消息过期
可以设置消息在一定时间后过期,默认是永不过期。
临时目的地
可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。
持久订阅
首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。 JMS Provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。JMS Provider会象客户发送客户处于非激活状态时所发布的消息。 持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。
本地事务
在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。 需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。 需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。
常用API
事务
session.commit();
session.rollback();
用来提交/回滚事务
Purge
清理消息
签收模式
签收代表接收端的session已收到消息的一次确认,反馈给broker
ActiveMQ支持自动签收与手动签收
Session.AUTO_ACKNOWLEDGE
当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。
Session.CLIENT_ACKNOWLEDGE
客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
Session.DUPS_OK_ACKNOWLEDGE
Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。
持久化
默认持久化是开启的
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
优先级
可以打乱消费顺序
producer.setPriority
配置文件需要指定使用优先级的目的地
<policyEntry queue="queue1" prioritizedMessages="true" />
消息超时/过期
producer.setTimeToLive
设置了消息超时的消息,消费端在超时后无法在消费到此消息。
给消息设置一个超时时间 -> 死信队列 -> 拿出来 -> 重发
死信
此类消息会进入到ActiveMQ.DLQ
队列且不会自动清除,称为死信
此处有消息堆积的风险
修改死信队列名称
<policyEntry queue="f" prioritizedMessages="true" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
让非持久化的消息也进入死信队列
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" processNonPersistent="true" />
processNonPersistent=”true”
过期消息不进死信队列
<individualDeadLetterStrategy processExpired="false" />
独占消费者
Queue queue = session.createQueue("xxoo?consumer.exclusive=true");
还可以设置优先级
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");
消息类型
object
发送端
Girl girl = new Girl("qiqi",25,398.0);
Message message = session.createObjectMessage(girl);
接受端
if(message instanceof ActiveMQObjectMessage) {
Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject();
System.out.println(girl);
System.out.println(girl.getName());
}
如果遇到此类报错
Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
... 1 more
需要添加信任
connectionFactory.setTrustedPackages(
new ArrayList<String>(
Arrays.asList(
new String[]{
Girl.class.getPackage().getName()
}
)
)
);
bytesMessage
发送端
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("str".getBytes());
bytesMessage.writeUTF("哈哈");
接受端
if(message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)message;
byte[] b = new byte[1024];
int len = -1;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序
bm.readBoolean()
bm.readUTF()
写入文件
FileOutputStream out = null;
try {
out = new FileOutputStream("d:/aa.txt");
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
byte[] by = new byte[1024];
int len = 0 ;
try {
while((len = bm.readBytes(by))!= -1){
out.write(by,0,len);
}
} catch (Exception e1) {
e1.printStackTrace();
}
MapMessage
发送端
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","lucy");
mapMessage.setBoolean("yihun",false);
mapMessage.setInt("age", 17);
producer.send(mapMessage);
接收端
Message message = consumer.receive();
MapMessage mes = (MapMessage) message;
System.out.println(mes);
System.out.println(mes.getString("name"));
消息发送原理
同步与异步
开启事务 | 关闭事务 | |
---|---|---|
持久化 | 异步 | 同步 |
非持久化 | 异步 | 异步 |
我们可以通过以下几种方式来设置异步发送:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616"
);
// 2.获取一个向ActiveMQ的连接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setUseAsyncSend(true);
消息堆积
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。
brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
destinationUri中设置: myQueue?producer.windowSize=1048576
延迟消息投递
首先在配置文件中开启延迟和调度
schedulerSupport=”true”
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延迟发送
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);
带间隔的重复发送
long delay = 10 * 1000;
long period = 2 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
createProducer.send(message);
Cron表达式定时发送
也支持Cron表达式。
监听器
可以使用监听器来处理消息接收
consumer.setMessageListener(new MyListener());
需要实现接口MessageListener
public class MyListener implements MessageListener {
public void onMessage(Message message) {
// TODO Auto-generated method stub
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("xxoo" + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
当收到消息后会调起onMessage方法
消息过滤
消息发送
MapMessage msg1 = session.createMapMessage();
msg1.setString("name", "qiqi");
msg1.setString("age", "18");
msg1.setStringProperty("name", "qiqi");
msg1.setIntProperty("age", 18);
MapMessage msg2 = session.createMapMessage();
msg2.setString("name", "lucy");
msg2.setString("age", "18");
msg2.setStringProperty("name", "lucy");
msg2.setIntProperty("age", 18);
MapMessage msg3 = session.createMapMessage();
msg3.setString("name", "qianqian");
msg3.setString("age", "17");
msg3.setStringProperty("name", "qianqian");
msg3.setIntProperty("age", 17);
消息接收
String selector1 = "age > 17";
String selector2 = "name = 'lucy'";
MessageConsumer consumer = session.createConsumer(queue,selector2);