ActiveMQ是由Apache基金会采用Java语言开发的一个开源的消息中间件,完美的遵循JMS规范。ActiveMQ易于实现高级场景,而且只需要付出低消耗。被誉为消息中间件的“瑞士军刀”。常用来处理高并发请求的流量削峰、事务处理。
本文,我们通过docker部署ActiveMQ,然后结合ActiveMQ的queue
及topic
模式,分别介绍在spring boot中的应用。
通过docker安装ActiveMQ
搜索可用的ActiveMQ
docker search activemq
我这里下载评星最高的webcenter/activemq
下载docker镜像
docker pull webcenter/activemq
运行docker
docker run -d --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
检查是否运行成功
打开http://139.198.172.114:8161/
,出现以下页面
可以点击Manage ActiveMQ broker
,默认用户名、密码都是admin
Spring Boot整合ActiveMQ
ActiveMQ在实际企业开发中主要有两种模式:点对点模式(Queue)和发布订阅模式(Topic)
Queue模式
Queue模式即队列(先进先出),消息提供者生产消息发布到Queue中,然后消费者从Queue中取出,并消费消息。这里需要注意的是,消息被消费者消费之后,Queue不再存储,所以消息只能被消费一次。Queue支持存在多个消息消费者,但是对一个消息而言,只会有一个消费者可以消费。
创建Spring Boot项目并添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
定义常量Constants.java
public class Constants {
public final static String QUEUEMESSAGE = "queue";
}
添加配置类ActiveMQConfig.java
@Configuration
public class ActiveMQConfig {
@Bean
public Queue queue(){
return new ActiveMQQueue(Constants.QUEUEMESSAGE);
}
}
创建消息提供者
@RestController
public class ProducerController {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource
private Queue queue;
@RequestMapping("/sendQueueMessage/{msg}")
public String sendQueueMessage(@PathVariable("msg") String msg) {
this.jmsMessagingTemplate.convertAndSend(queue, msg);
return msg;
}
}
配置ActiveMQ
修改项目配置文件,配置ActiveMQ
spring.activemq.broker-url=tcp://139.198.172.114:61616
#接收所有消息
spring.activemq.packages.trust-all=true
创建消息消费者Consumer.java
@Component
@Slf4j
public class Consumer {
@JmsListener(destination = Constants.QUEUEMESSAGE)
public void receiveQueueMessage1(String message) {
if (null != message) {
log.info("收到queue报文1:" + message);
}
}
@JmsListener(destination = Constants.QUEUEMESSAGE)
public void receiveQueueMessage2(String message) {
if (null != message) {
log.info("收到queue报文2:" + message);
}
}
}
测试
我们访问http://localhost:8083/sendQueueMessage/123
,可以查看控制台输出信息
可以看到,虽然我们提供了两个消费者,但是实际上只有一个消费者消费成功。
Topic模式
消息提供者将消息发布到Topic中,同时有多个消费者(订阅者)消费该消息,与Queue模式不同,发布到Topic消息会被所有的订阅者消费。
Topic模式与Queue模式类似,我们这里基于Queue模式的代码进行修改。
修改配置类ActiveMQConfig.java
增加Topic的Bean,修改后代码如下
@Configuration
public class ActiveMQConfig {
@Bean
public Queue queue(){
return new ActiveMQQueue(Constants.QUEUEMESSAGE);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(Constants.TOPICMESSAGE);
}
}
修改消息消费者
@RestController
public class ProducerController {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
@Resource
private Queue queue;
@Resource
private Topic topic;
@RequestMapping("/sendQueueMessage/{msg}")
public String sendQueueMessage(@PathVariable("msg") String msg) {
this.jmsMessagingTemplate.convertAndSend(queue, msg);
return msg;
}
@RequestMapping("/sendTopicMessage/{msg}")
public String sendTopicMessage(@PathVariable("msg") String msg) {
this.jmsMessagingTemplate.convertAndSend(topic, msg);
return msg;
}
}
修改消费者
@Component
@Slf4j
public class Consumer {
@JmsListener(destination = Constants.QUEUEMESSAGE)
public void receiveQueueMessage1(String message) {
if (null != message) {
log.info("收到queue报文1:" + message);
}
}
@JmsListener(destination = Constants.QUEUEMESSAGE)
public void receiveQueueMessage2(String message) {
if (null != message) {
log.info("收到queue报文2:" + message);
}
}
@JmsListener(destination = Constants.TOPICMESSAGE)
public void receiveTopicMessage1(String message) {
if (null != message) {
log.info("收到topic报文1:" + message);
}
}
@JmsListener(destination = Constants.TOPICMESSAGE)
public void receiveTopicMessage2(String message) {
if (null != message) {
log.info("收到topic报文2:" + message);
}
}
}
修改项目配置文件
通过spring.jms.pub-sub-domain=true
配置启用Topic。
server.port=8083
spring.activemq.broker-url=tcp://139.198.172.114:61616
#接收所有消息
spring.activemq.packages.trust-all=true
#配置使用topic
spring.jms.pub-sub-domain=true
测试
访问http://localhost:8083/sendTopicMessage/123
,查看控制台。
可以看到两个订阅者都输出了消息。
ActiveMQ监控
我们通过http://139.198.172.114:8161/admin/topics.jsp
,也可以查看服务器的信息
评论 (0)