原文地址
mqtt序列文章
1、MQTT BROKER 技术选型
2、Windows安装EMQ X
3、EMQ X 快速入门
4、EMQ X 实践
一、mqtt简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅(publish-subscribe)模式的消息中间件通信协议,特别适合于资源有限的设备和网络带宽受限的环境。该协议由IBM公司在1998年设计并开发,现已成为物联网(IoT)、机器对机器(M2M)通信、移动应用以及实时数据传输等领域广泛应用的标准之一。主要用于物联网(IoT)、移动互联网以及远程传感器等低带宽、高延迟、不可靠网络环境下的通信。其主要特点包括:
- 轻量级协议:MQTT设计简洁,报文头部小,占用资源少,非常适合于低功耗设备和低带宽网络环境。
- 发布/订阅模式:MQTT采用发布/订阅模型,发布者将消息发送至某个主题(Topic),订阅者可以订阅自己关心的主题,当有新的消息发布到该主题时,订阅者会收到通知并获取消息内容。
- 多级QoS服务质量:MQTT支持三种服务质量等级,分别为0、1、2。其中,0级表示最多一次(At most once),即消息可能会丢失但能减少网络流量;1级表示至少一次(At least once),确保消息到达,但可能会重复;2级表示Exactly once,确保消息只送达一次且不丢失。
- 持久连接与心跳机制:客户端与服务器之间可建立长连接,保持在线状态。同时通过心跳(PING/PONG)机制来检测和维护连接的有效性。
- broker中转:MQTT的消息传输通过broker(消息代理)进行中转,broker负责接收、存储和转发消息,实现消息路由。
- 跨平台兼容性:MQTT协议具有良好的跨平台特性,可在各种操作系统和硬件平台上运行,便于不同设备间的数据交换。
- 安全可控:支持用户权限验证、SSL/TLS加密等机制,保证数据传输的安全性。
二、MQTT 的工作原理
透彻理解MQTT协议的工作原理,以下五个关键概念是基础:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。
MQTT 客户端:
- MQTT客户端可以是任何设备或应用程序,它们通过TCP/IP连接与MQTT Broker进行交互。
- 客户端可以作为发布者(Publisher),负责将消息发送到Broker;也可以作为订阅者(Subscriber),向Broker请求接收特定主题的消息。
MQTT Broker:
- MQTT Broker是整个MQTT架构的核心服务器组件,它处理所有的客户端建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。
- Broker维护着一个主题空间以及每个主题的订阅列表,当接收到发布者发来的话题消息时,会根据订阅关系分发给所有相关订阅者。
- 一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。
发布-订阅模式:
- MQTT采用的是发布-订阅模式而非点对点通信方式。
- 发布者将消息发送至某个特定的主题(Topic),并不关心谁会接收这些消息。
- 订阅者则告知Broker他们想要接收哪些主题上的消息,一旦有相应主题的消息到达,Broker就会把消息推送给订阅者。
主题(Topic):
- 主题是MQTT中的一种消息分类机制,它是一个字符串标识符,用于区分不同种类的消息流。
- 一个主题可以包含多个层次,通过 / 来区分层级,类似于 URL 路径“home/livingroom/temperature”,允许灵活订阅和发布细化到具体层级的消息。
- MQTT 主题支持以下两种通配符:+ 和 #。+:表示单层通配符,例如 a/+ 匹配 a/x 或 a/y。#:表示多层通配符,例如 a/# 匹配 a/x、a/b/c/d
服务质量(QoS):
- MQTT定义了三种QoS级别,用于保证消息传输的可靠性和效率。
- QoS 0(最多一次):消息可能丢失但传送最快;
- QoS 1(至少一次):确保消息至少被接收一次,可能会有重复;
- QoS 2(Exactly Once):确保消息仅且精确地送达一次,提供最高级别的可靠性。
三、MQTT 的工作流程
连接(Connect):客户端首先与Broker建立TCP连接,并在连接时可能携带认证信息(用户名、密码等)及会话参数。
订阅(Subscribe):订阅者向Broker发送订阅请求,明确表示对哪些主题感兴趣。
发布(Publish):发布者向Broker发送带有特定主题名的消息。消息可以携带有效载荷,内容不限,可以是JSON、XML或其他格式的数据。
消息分发:Broker接收到发布的消息后,根据主题匹配已有的订阅列表,将消息投递到所有订阅了对应主题的客户端。
服务质量(QoS):MQTT定义了三种服务质量等级,确保消息从发布者到达订阅者的方式不同,包括最多一次(QoS 0)、至少一次(QoS 1)和精确一次(QoS 2)。
心跳与保活(Keep Alive/Ping/Pong):为了检测连接的有效性,MQTT支持心跳机制,客户端可以通过发送PINGREQ报文来维持连接,Broker收到后回应PINGRESP报文。
四、入门教程
1、准备mqtt broker
选择emqx作为mqtt broker,并安装部署。
mqtt broker 选型文
Windows系统安装emqx
2、使用springboot 集成mqtt client
日常微服务开发基本框架springboot的,在我们的服务需要发生或者订阅消息,集成mqtt client
导入maven
SpringBoot版本:2.3.12.RELEASE
<!-- mqtt client -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置文件 yaml
mqtt:
enable: true
hostUrl: tcp://127.0.0.1:1883
username: dev
password: 123456
client-id: MQTT-CLIENT-DEV
cleanSession: true
reconnect: true
timeout: 100
keepAlive: 100
qos: 0
配置类MqttConfig
package com.gitee.xmhzzz.mqtt.quick.config;
import lombok.Data;
@Data
public class MqttConfig {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户端Id,同一台服务器下,不允许出现重复的客户端id
*/
private String clientId;
/**
* 超时时间
*/
private int timeout;
/**
* 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
*/
private int keepAlive;
/**
* 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
*/
private Boolean cleanSession;
/**
* 是否断线重连
*/
private Boolean reconnect;
/**
* 启动的时候是否关闭mqtt
*/
private Boolean enable;
/**
* 连接方式
*/
private Integer qos;
}
MQTT客户端 MyMqttClient
package com.gitee.xmhzzz.mqtt.quick.client;
import com.gitee.xmhzzz.mqtt.quick.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @ClassName MyMqttClient
* @Description
* @Author wzq
* @Date 2024/1/14 14:02
* @Version 1.0
*/
public class MyMqttClient implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MyMqttClient.class);
private MqttConfig mqttConfig;
public static MqttClient client;
public MyMqttClient(MqttConfig mqttConfig) {
this.mqttConfig = mqttConfig;
}
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MyMqttClient.client = client;
}
/**
* 客户端连接
*/
public void connect() {
MqttClient client;
try {
client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId(),
new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
options.setConnectionTimeout(mqttConfig.getTimeout());
options.setKeepAliveInterval(mqttConfig.getKeepAlive());
options.setAutomaticReconnect(mqttConfig.getReconnect());
options.setCleanSession(mqttConfig.getCleanSession());
MyMqttClient.setClient(client);
// 设置回调
client.setCallback(this);
client.connect(options);
} catch (Exception e) {
logger.error("MyMqttClient connect error,message:{}", e.getMessage());
e.printStackTrace();
}
}
/**
* 重新连接
*/
public void reconnection() {
try {
client.connect();
} catch (MqttException e) {
logger.error("MyMqttClient reconnection error,message:{}", e.getMessage());
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
logger.info("========================【开始订阅主题:" + topic + "】========================");
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
logger.error("MyMqttClient subscribe error,message:{}", e.getMessage());
}
}
/**
* 发布消息
*
* @param retained 是否保留
* @param topic 主题,格式: server:${env}:report:${topic}
* @param content 消息内容
*/
public void publish(boolean retained, String topic, String content) {
MqttMessage message = new MqttMessage();
message.setQos(mqttConfig.getQos());
message.setRetained(retained);
message.setPayload(content.getBytes());
try {
client.publish(topic, message);
} catch (MqttException e) {
logger.error("MqttSendClient publish error,message:{}", e.getMessage());
}
}
/**
* 取消订阅某个主题
*
* @param topic
*/
public void unsubscribe(String topic) {
logger.info("========================【取消订阅主题:" + topic + "】========================");
try {
client.unsubscribe(topic);
} catch (MqttException e) {
logger.error("MyMqttClient unsubscribe error,message:{}", e.getMessage());
}
}
/**
* 客户端断开后触发
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("连接断开,可以重连");
if (client == null || client.isConnected()) {
logger.info("【emqx重新连接】....................................................");
this.reconnection();
}
}
/**
* 客户端收到消息触发
*
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("【接收消息主题】:" + topic);
logger.info("【接收消息Qos】:" + mqttMessage.getQos());
logger.info("【接收消息内容】:" + new String(mqttMessage.getPayload()));
// int i = 1/0;
}
/**
* 发布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主题【" + topic + "】发送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
logger.info("【消息内容】:" + s);
} catch (Exception e) {
logger.error("MyMqttCallback deliveryComplete error,message:{}", e.getMessage());
}
}
/**
* 连接emq服务器后触发
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
logger.info("============================= 客户端【" + client.getClientId() + "】连接成功!=============================");
// 以/#结尾表示订阅所有以test开头的主题
// 订阅所有机构主题
this.subscribe("mqtt/quick/msg-send", 0);
}
}
测试类MqttController
package com.gitee.xmhzzz.mqtt.quick.controller;
import com.gitee.xmhzzz.mqtt.quick.client.MyMqttClient;
import com.gitee.xmhzzz.mqtt.quick.req.PublishReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @ClassName MqttController
* @Description
* @Author wzq
* @Date 2024/1/14 15:40
* @Version 1.0
*/
@Slf4j
@RequestMapping("/mqtt")
@RestController
public class MqttController {
@Autowired
private MyMqttClient myMqttClient;
@PostMapping(value = "/publish")
public String publishTopic(@RequestBody PublishReq req) {
log.info("topic: {}",req.getTopic());
log.info("message: {}", req.getSendMessage());
this.myMqttClient.publish(false, req.getTopic(), req.getSendMessage());
return "topic:" + req.getTopic() + "\nmessage:" + req.getSendMessage();
}
}
Bean管理类 MqttConfiguration
MyMqttClient注入Spring容器中,并连接EMQX
package com.gitee.xmhzzz.mqtt.quick.config;
import com.gitee.xmhzzz.mqtt.quick.client.MyMqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName MqttConfigration
* @Description
* @Author wzq
* @Date 2024/1/14 14:52
* @Version 1.0
*/
@Configuration
@ConditionalOnProperty(value = "mqtt.enable", havingValue = "true")
public class MqttConfiguration {
@Bean
@ConfigurationProperties(prefix = "mqtt")
public MqttConfig mqttConfig() {
return new MqttConfig();
}
/**
* 订阅mqtt
*
* @return
*/
@Bean
public MyMqttClient getMqttPushClient(MqttConfig mqttConfig) {
MyMqttClient myMqttClient = new MyMqttClient(mqttConfig);
myMqttClient.connect();
return myMqttClient;
}
}
3、启动服务
发送message
监听topic
五、其它
mqtt 入门介绍https://www.emqx.com/zh/blog/the-easiest-guide-to-getting-started-with-mqtt
emqx 官方文档https://www.emqx.io/docs/zh/latest/
Windows安装emqx
gitee源码 https://gitee.com/xmhzzz/micro-practice/tree/master/mqtt-quick-start
评论区