MQTT实战指南:使用EMQX、MQTTX与ESP32搭建物联网通信系统

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种基于“发布/订阅”范式的轻量级消息传输协议。它专为计算资源受限、带宽有限以及网络连接不稳定的设备设计,具有报文结构简洁、功耗低、传输可靠等特点。

在MQTT的架构中,主要包含三个核心角色:

  1. ​Broker(服务端/代理):负责接收、过滤和分发所有消息的核心枢纽。
  2. ​Publisher(发布者)​:发送消息的客户端(如传感器、ESP32)。
  3. ​Subscriber(订阅者):接收特定主题消息的客户端(如手机APP、监控后台)。

这种机制实现了设备之间的解耦通信,即发布者不需要知道订阅者的存在,只需将消息发送给Broker,由Broker根据“主题(Topic)”分发给对应的订阅者。

第一步:搭建 MQTT 服务端 (EMQX)

我们将使用开源且高性能的 EMQX 作为 MQTT Broker。这里以 Windows 系统为例进行快速部署(Linux/MacOS 也支持类似方式或使用 Docker 安装):

  1. 下载与解压​:前往 EMQX 官网下载最新的 Windows 版本压缩包(zip格式),并将其解压到一个不含中文或空格的路径下(例如 C:\emqx)。

  2. 启动服务​:以管理员身份打开命令提示符(CMD),进入 bin 目录并执行启动命令:

    cd C:\emqx\bin
    emqx start
    

    linux下安装:

#下载
wget https://www.emqx.com/zh/downloads/enterprise/6.2.0/emqx-enterprise-6.2.0-ubuntu24.04-amd64.deb

#安装
sudo apt install ./emqx-enterprise-6.2.0-ubuntu24.04-amd64.deb

#启动
sudo systemctl start emqx

启动成功后,你可以通过浏览器访问 http://localhost:18083 进入 EMQX 的 Dashboard(默认账号为 admin,密码为 public)。
3. ​配置账号密码认证​:为了满足安全连接的需求,我们需要开启密码认证。

  • 登录 Dashboard 后,在左侧菜单栏找到“访问控制” -> “认证”。
  • 点击“创建”,选择“内置数据库”作为数据源。
  • 添加一个用户,例如设置用户名为 esp32_user,密码为 esp32_password
  • (可选)为了强制要求所有连接都必须带密码,可以在配置文件 etc/emqx.conf 中将 allow_anonymous = false,然后重启 EMQX。

第二步:安装桌面调试工具 (MQTTX)

MQTTX 是一款跨平台的现代化 MQTT 5.0 客户端工具箱,界面直观,非常适合用来模拟设备和调试。

  1. 下载安装​:前往 MQTTX 官网,根据你的操作系统(Windows/macOS/Linux)下载并安装最新版本。
  2. 建立连接测试​:
    • 打开 MQTTX,点击左上角“新建连接”。
    • 名称随意填写,Host 填入 127.0.0.1(本地服务器),Port 保持默认的 1883
    • 在下方的高级设置或常规设置中,填入刚才在 EMQX 中创建的用户名 esp32_user 和密码 esp32_password
    • 点击右上角“连接”。如果显示绿色在线状态,说明你的 EMQX 服务端已准备就绪。

第三步:ESP32 客户端代码编写

接下来,我们将使用 Arduino IDE 对 ESP32 进行编程,使其通过 WiFi 连接到本地的 EMQX 服务器,并与 MQTTX 进行互相通讯。

准备工作:
在 Arduino IDE 的“库管理器”中搜索并安装 PubSubClient 库(作者:Nick O'Leary)。

完整示例代码:
请将代码中的 WiFi 名称、密码以及 MQTT 服务器的 IP 地址替换为你实际环境的参数。

#include <WiFi.h>
#include <PubSubClient.h>

// ================= 配置区域 =================
const char* ssid = "YOUR_WIFI_SSID";        // 替换为你的 WiFi 名称
const char* password = "YOUR_WIFI_PASSWORD"; // 替换为你的 WiFi 密码

const char* mqtt_server = "192.168.x.x";     // 替换为运行 EMQX 电脑的局域网 IP 地址
const int mqtt_port = 1883;
const char* mqtt_user = "esp32_user";        // EMQX 中创建的账号
const char* mqtt_password = "esp32_password";// EMQX 中创建的密码
// ===========================================

WiFiClient espClient;
PubSubClient client(espClient);

// 定义通讯的主题
const char* subscribeTopic = "home/livingroom/light"; // ESP32 订阅的主题(接收指令)
const char* publishTopic = "home/livingroom/status";  // ESP32 发布的主题(上报状态)

// 接收到 MQTT 消息时的回调函数
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("收到消息 [");
  Serial.print(topic);
  Serial.print("] ");
  
  // 将接收到的二进制 payload 转换为字符串打印
  String message;
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);

  // 简单的逻辑处理:如果收到 "ON",点亮板载 LED;收到 "OFF",熄灭 LED
  if (message == "ON") {
    digitalWrite(LED_BUILTIN, HIGH);
    client.publish(publishTopic, "Light is ON"); // 回复状态给 MQTTX
  } else if (message == "OFF") {
    digitalWrite(LED_BUILTIN, LOW);
    client.publish(publishTopic, "Light is OFF");
  }
}

// 重新连接 MQTT 服务器的函数
void reconnect() {
  while (!client.connected()) {
    Serial.print("正在尝试连接 MQTT 服务器...");
    // 使用 Client ID、用户名和密码进行连接
    if (client.connect("ESP32_Client", mqtt_user, mqtt_password)) {
      Serial.println("连接成功!");
      // 连接成功后订阅主题
      client.subscribe(subscribeTopic);
    } else {
      Serial.print("连接失败,rc=");
      Serial.print(client.state());
      Serial.println(" 5秒后重试...");
      delay(5000);
    }
  }
}

void setup() {
  pinMode(LED_BUILTIN, OUTPUT);
  Serial.begin(115200);
  
  // 1. 连接 WiFi
  Serial.print("正在连接 WiFi: ");
  Serial.println(ssid);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("\nWiFi 已连接,IP 地址: ");
  Serial.println(WiFi.localIP());

  // 2. 配置 MQTT 服务器和回调函数
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void loop() {
  // 如果连接断开,自动重连
  if (!client.connected()) {
    reconnect();
  }
  // 循环处理 MQTT 消息
  client.loop();

  // 演示:每隔 10 秒主动上报一次心跳状态
  static unsigned long lastMsg = 0;
  if (millis() - lastMsg > 10000) {
    lastMsg = millis();
    client.publish(publishTopic, "I am alive!");
    Serial.println("主动上报状态: I am alive!");
  }
}

MicroPython版

准备工作:

  1. 确保你的 ESP32 已经烧录了 MicroPython 固件。
  2. 推荐使用 Thonny IDE 或 VSCode (配合 Pymakr 插件) 来编写和上传代码。

完整示例代码:
请将代码中的 WiFi 名称、密码以及 MQTT 服务器的 IP 地址替换为你实际环境的参数。

import network
import time
from umqtt.simple import MQTTClient

# ================= 配置区域 =================
WIFI_SSID = "YOUR_WIFI_SSID"           # 替换为你的 WiFi 名称
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"   # 替换为你的 WiFi 密码

MQTT_SERVER = "192.168.x.x"            # 替换为运行 EMQX 电脑的局域网 IP 地址
MQTT_PORT = 1883
MQTT_USER = "esp32_user"               # EMQX 中创建的账号
MQTT_PASSWORD = "esp32_password"       # EMQX 中创建的密码

CLIENT_ID = "ESP32_Client_Py"          # 自定义客户端ID
SUBSCRIBE_TOPIC = b"home/livingroom/light"  # ESP32 订阅的主题(接收指令,注意加b转为bytes)
PUBLISH_TOPIC = "home/livingroom/status"    # ESP32 发布的主题(上报状态)
# ===========================================

# 初始化板载LED引脚(不同开发板引脚可能不同,ESP32通常为GPIO2)
led = machine.Pin(2, machine.Pin.OUT)

# 1. 连接 WiFi
def connect_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('正在连接 WiFi...')
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        while not wlan.isconnected():
            time.sleep(1)
    print('WiFi 已连接,IP 地址:', wlan.ifconfig()[0])

# 2. 接收到 MQTT 消息时的回调函数
def mqtt_callback(topic, msg):
    print(f"收到消息 [主题: {topic}] 内容: {msg}")
    
    # 简单的逻辑处理:如果收到 "ON",点亮 LED;收到 "OFF",熄灭 LED
    if msg == b"ON":
        led.value(1)  # 点亮
        client.publish(PUBLISH_TOPIC, "Light is ON")  # 回复状态给 MQTTX
        print("执行动作: 开灯")
    elif msg == b"OFF":
        led.value(0)  # 熄灭
        client.publish(PUBLISH_TOPIC, "Light is OFF")
        print("执行动作: 关灯")

# 3. 主程序入口
if __name__ == "__main__":
    # 连接网络
    connect_wifi()
    
    # 建立 MQTT 客户端并传入用户名和密码
    client = MQTTClient(CLIENT_ID, MQTT_SERVER, port=MQTT_PORT, user=MQTT_USER, password=MQTT_PASSWORD, keepalive=60)
    client.set_callback(mqtt_callback)  # 设置消息回调函数
    
    try:
        client.connect()
        print("成功连接到 MQTT 服务器")
        client.subscribe(SUBSCRIBE_TOPIC)  # 订阅主题
        
        # 进入主循环,持续监听消息
        while True:
            client.check_msg()  # 非阻塞检查是否有新消息
            
            # 演示:每隔 10 秒主动上报一次心跳状态
            # 在实际生产中建议使用定时器,这里为了简化直接用时间差判断
            last_msg_time = time.time()
            if time.time() - last_msg_time > 10: 
                client.publish(PUBLISH_TOPIC, "I am alive!")
                print("主动上报状态: I am alive!")
                last_msg_time = time.time()
                
            time.sleep(1)
            
    except Exception as e:
        print("发生错误:", e)
        client.disconnect()

第四步:实现互相通讯

当 ESP32 烧录代码并成功启动后,我们可以通过以下步骤验证它们是否在进行双向通讯:

  1. 查看 ESP32 日志​:打开 Arduino IDE 的串口监视器,你应该能看到 ESP32 成功连接 WiFi 和 MQTT 服务器的日志。
  2. MQTTX 控制 ESP32​:
    • 在 MQTTX 中,订阅主题 home/livingroom/status(这是 ESP32 上报的主题)。
    • 向主题 home/livingroom/light(这是 ESP32 监听的指令主题)发送一条内容为 ON 的消息。
    • 此时,你会看到 ESP32 的板载 LED 被点亮,同时串口监视器打印出收到的消息,并且 MQTTX 收到了来自 ESP32 的回复:“Light is ON”。
  3. 观察自动上报​:即使你不发送指令,MQTTX 也会每隔 10 秒收到一条 ESP32 主动发来的 “I am alive!” 消息。

通过以上步骤,你就成功利用 EMQX 作为中枢,打通了桌面端 MQTTX 与硬件端 ESP32 之间的双向实时通讯链路。

评论 (0)

暂无评论