SpringBoot 动态添加监听 rabbitMQ 队列
动态监听队列
需求
这里需要监听多个队列,而且运行途中可能会增加监听,或减少监听,因此实现需要采用SimpleMessageListenerContainer
类
步骤
添加gradle依赖
1
2implementation 'org.springframework.boot:spring-boot-starter-amqp'
compile 'cn.hutool:hutool-all:5.3.8'添加
application.properties
1
2
3
4
5
6
7
8
9spring.rabbitmq.host=10.10.10.11
spring.rabbitmq.port=14012
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=5000
spring.rabbitmq.countDownLatch=5
spring.rabbitmq.webport=14013
spring.rabbitmq.websocket-port=14014创建一个监听类
RbMQReceiverHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* 监听接收消息
*/
public class RbMQReceiverHandler implements MessageListener {
private final Logger log = LoggerFactory.getLogger(getClass());
public void onMessage(Message message) {
log.info("====接收到" + message.getMessageProperties().getConsumerQueue() + "队列的消息=====");
log.info(message.getMessageProperties().toString());
log.info(new String(message.getBody()));
}
}创建一个
RabbitMQConfig.java
配置文件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//huTool添加,才能用getBean
public class RabbitMQConfig {
RbMQReceiverHandler rbMQReceiverHandler;
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("test1_staff");
container.setMessageListener(rbMQReceiverHandler);
return container;
}
}添加一个动态添加队列的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RbController {
public String addQueue( { String queueNmae)
SimpleMessageListenerContainer container = SpringUtil.getBean(SimpleMessageListenerContainer.class);//获取实例
container.addQueueNames(queueNmae);
return "add " + queueNmae + " ok";
}
public String delQueue( { String queueNmae)
SimpleMessageListenerContainer container = SpringUtil.getBean(SimpleMessageListenerContainer.class);
container.removeQueueNames(queueNmae);
return "delete " + queueNmae + " ok";
}
}测试调用post 127.0.0.1:8080/queue 接口就能添加队列了,发送mq的消息没写测试方法,但是可以直接到mq的管理页面push一条消息进行测试
多线程监听队列
监听队列时,单线程