Spring引入Mqtt消息组件
基础概念
*QoS*
QoS(Quality of Service)指消息传输的服务质量。分别可在消息发送端和消息消费端设置。
- 发送端的QoS设置:影响发送端发送消息到云消息队列 MQTT 版的传输质量。
- 消费端的QoS设置:影响云消息队列 MQTT 版服务端投递消息到消费端的传输质量。
QoS包括以下级别:
- QoS0:代表最多分发一次。
- QoS1:代表至少达到一次。
- QoS2:代表仅分发一次。
*cleanSession*
- cleanSession标志是MQTT协议中对一个消费者客户端建立TCP连接后是否关心之前状态的定义,与消息发送端的设置无关。具体语义如下:
- cleanSession=true:消费者客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息。
- cleanSession=false:消费者客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效。
- cleanSession标志是MQTT协议中对一个消费者客户端建立TCP连接后是否关心之前状态的定义,与消息发送端的设置无关。具体语义如下:
QoS和cleanSession搭配使用时需注意以下几点:
- MQTT要求每个客户端每次连接时的cleanSession标志必须固定,不允许动态变化,否则会导致离线消息的判断有误。
- MQTT目前对外QoS2消息不支持非cleanSession,如果客户端以QoS2方式订阅消息,即使设置cleanSession=false也不会生效。
- P2P消息的cleanSession判断以接收方客户端的配置为准。
消费端QoS和cleanSession的不同组合产生的结果如QoS和cleanSession的组合关系所示。
QoS级别 | cleanSession=true | cleanSession=false |
---|---|---|
QoS0 | 无离线消息,在线消息只尝试推一次。 | 无离线消息,在线消息只尝试推一次。 |
QoS1 | 无离线消息,在线消息保证可达。 | 有离线消息,所有消息保证可达。 |
QoS2 | 无离线消息,在线消息保证可达且只接收一次。 | 暂不支持。 |
spring接入mqtt
添加依赖
1
2
3
4<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>添加mq配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MqttConfigProperties {
String host;
String user;
String password;
String clientId;
String topic;
String sendTopic1;
String sendTopic2;
Integer qos;
Integer timeout;
Integer keepalive;
}对应的配置文件
1
2
3
4
5
6
7
8
9mqtt:
host: tcp://172.16.1.1:1883
user: mqtt_user_test
password: mqtt_user_test
qos: 0
clientId: sys_manager
topic: sys/position
timeout: 10
keepalive: 60注入配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class MqttConfiguration {
private MqttConfigProperties mqttConfigProperties;
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
/***
* mqtt连接配置
*/
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttConfigProperties.getUser());
mqttConnectOptions.setPassword(mqttConfigProperties.getPassword().toCharArray());
mqttConnectOptions.setConnectionTimeout(mqttConfigProperties.getTimeout());
mqttConnectOptions.setKeepAliveInterval(mqttConfigProperties.getKeepalive());
mqttConnectOptions.setServerURIs(new String[]{mqttConfigProperties.getHost()});
mqttConnectOptions.setCleanSession(false);
defaultMqttPahoClientFactory.setConnectionOptions(mqttConnectOptions);
return defaultMqttPahoClientFactory;
}
/**
* 出站配置
*/
public MessageHandler messageOutHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "outChannel", mqttPahoClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultTopic(mqttConfigProperties.getTopic());
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
defaultPahoMessageConverter.setPayloadAsBytes(true);
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
/**
* 入站配置
*/
public MessageProducer inbound() {
String uuid = UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConfigProperties.getClientId() + uuid, mqttPahoClientFactory(), mqttConfigProperties.getTopic());
adapter.setCompletionTimeout(5000);
adapter.setQos(0);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 处理入站消息
*/
public MessageHandler handler() {
return message -> {
log.info("message payload {}", String.valueOf(message.getPayload()));
};
}
}发送消息
1
2
3
4
public interface MqttGateway {
void sentToMqtt(String data);
}
spring 调用emqx的api
点击左侧系统设置菜单下的 API 密钥,可以来到 API 密钥页面。如果需要 API 密钥来创建一些脚本调用 HTTP API,可以在此页面进行创建获取操作。点击页面右上角创建按钮打开创建 API 密钥弹框,填写 API 密钥相关数据,如果到期时间未填写 API 密钥将永不过期,点击确定提交数据,提交成功后页面上将提供此次创建的 API 密钥的 API Key 和 Secret Key,其中 Secret Key 后续将不再显示,用户需立即将 API Key 和 Secret Key 保存至安全的地方;保存数据完毕可点击关闭按钮关闭弹框。
在 API 密钥页面上,您可以按照以下步骤生成用于访问 HTTP API 的 API 密钥和 Secret key。
单击页面右上角的**+ 创建**按钮,弹出创建 API 密钥的对话框。
在创建 API 密钥对话框上,配置 API 密钥的详细信息。
如果到期时间文本框留空,API 密钥将永不过期。
单击确认按钮,API 密钥和密钥将被创建并显示在创建成功对话框中。
代码示例:
1 | import cn.com.iexxk.config.MqttConfigProperties; |