首先明确概念,什么是MQTT?
MQTT是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的物联网(IoT)通信协议,在带宽有限、网络不稳定的环境下,实现低功耗、低延迟的设备间通信,是物联网领域的核心通信协议之一(被 OASIS 标准化,广泛支持于 AWS IoT、Azure IoT、阿里云 IoT 等平台)。
核心设计目标
MQTT 的所有特性均围绕 “适配物联网场景” 展开,核心目标可概括为 4 点:
轻量级
协议头部极小(固定头部仅 2 字节,可变头部和负载按需添加),设备侧客户端代码体积可压缩至几十 KB,适配单片机、传感器等资源受限设备(如 Arduino、ESP8266)。
低带宽 / 低功耗
减少数据传输量(无冗余字段),支持 “休眠 - 唤醒” 模式,适合 2G/4G/NB-IoT 等窄带或按流量计费的网络。
高可靠性
通过 “服务质量(QoS)” 机制保障消息不丢失、不重复,应对物联网常见的 “网络波动、设备离线” 场景。
解耦通信
基于 “发布 / 订阅” 模式,设备(发布者)和平台(订阅者)无需直接交互,甚至无需知道对方的 IP / 端口,降低系统耦合度。
核心架构:发布 / 订阅模式
MQTT 不采用 “点对点(P2P)” 直接通信,而是通过中间代理(Broker) 实现消息转发,架构包含 3 个核心角色
角色 | 功能 | 典型示例 |
---|---|---|
发布者(Publisher) | 发送消息的设备 / 应用,无需知道谁会接收消息,仅需指定消息的 “主题(Topic)”。 | 温湿度传感器、智能门锁 |
订阅者(Subscriber) | 接收消息的设备 / 应用,需提前 “订阅” 感兴趣的 “主题(Topic)”,仅接收对应消息。 | 物联网平台、手机 APP、PLC |
代理(Broker) | 核心中间件,负责接收发布者的消息,根据 “主题” 匹配订阅者,并将消息转发给订阅者;同时管理设备连接、会话、QoS 等逻辑。 | EMQX、Mosquitto、AWS IoT Core |
关键概念:主题(Topic)
“主题” 是 MQTT 消息的 “分类标签”,类似文件系统的路径(用/分隔层级),用于实现消息的精准路由。例如:
传感器主题:sensor/room1/temperature(1 号房间温度传感器)、sensor/room2/humidity(2 号房间湿度传感器);
设备控制主题:device/light/room1/control(控制 1 号房间灯光)。
这里借助的MQTT的插件是MQTTnet+拓展MQTTnet.Extensions.ManagedClient,这个拓展提供了一个托管的 MQTT 客户端实现,简化了 MQTT 客户端的使用并增强了可靠性。它自动处理连接管理、重连逻辑、消息队列等功能,非常适合需要稳定 MQTT 通信的应用程序。
https://download.csdn.net/download/weixin_44347839/91777855
using UnityEngine;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using MQTTnet.Protocol;
using MQTTnet.Packets;
using System.Linq;
using MyFrameworkPro;
public class MQTTClient : MonoSingleton<MQTTClient>
{
[Header("MQTT Broker Settings")]
[Tooltip("MQTT服务器地址")]
public string brokerAddress = "test.mosquitto.org";
[Tooltip("MQTT服务器端口")]
public int brokerPort = 1883;
[Tooltip("客户端ID,为空则自动生成")]
public string clientId = "";
[Tooltip("用户名,无则留空")]
public string username = "";
[Tooltip("密码,无则留空")]
public string password = "";
[Header("Connection Settings")]
[Tooltip("是否使用TLS加密连接")]
public bool useTls = false;
[Tooltip("自动重连间隔(秒)")]
public int reconnectInterval = 5;
[Tooltip("心跳包间隔(秒)")]
private int KeepAlive = 60;
// 管理型MQTT客户端
private IManagedMqttClient _mqttClient;
// 连接状态
public bool IsConnected => _mqttClient?.IsConnected ?? false;
// 消息队列,用于在主线程处理接收到的消息
private readonly Queue<MqttApplicationMessageReceivedEventArgs> _messageQueue =
new Queue<MqttApplicationMessageReceivedEventArgs>();
// 事件定义
public event Action OnConnected;
public event Action OnDisconnected;
public event Action<string> OnLog;
public event Action<string, string> OnMessageReceived; // 主题, 消息内容
private void Awake()
{
// 确保场景中只存在一个MQTT客户端实例
if (FindObjectsOfType<MQTTClient>().Length > 1)
{
Destroy(gameObject);
}
else
{
DontDestroyOnLoad(gameObject);
}
}
private void Start()
{
// 初始化客户端
InitializeClient();
}
private void Update()
{
// 在主线程处理消息队列
ProcessMessageQueue();
}
protected override void OnDestroy()
{
base.OnDestroy();
// 断开连接并清理资源
DisconnectAsync().ConfigureAwait(false);
_mqttClient?.Dispose();
}
/// <summary>
/// 初始化MQTT客户端
/// </summary>
public void InitializeClient()
{
if (_mqttClient != null)
{
_mqttClient.Dispose();
}
clientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString() : clientId;
//配置连接参数
Options = new MqttClientOptionsBuilder()
.WithTcpServer(brokerAddress, brokerPort)
.WithClientId(clientId)
.WithCredentials(username, password)
.WithCleanSession()
.WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepAlive))
.Build();
//配置重连机制
ManagedOptions = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(Options)
.Build();
// 创建管理型客户端
_mqttClient = new MqttFactory().CreateManagedMqttClient();
// 注册事件
_mqttClient.ConnectedAsync += OnConnectedAsync;
_mqttClient.DisconnectedAsync += OnDisconnectedAsync;
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
_mqttClient.ConnectingFailedAsync += OnConnectingFailedAsync;
}
private MqttClientOptions Options;
private ManagedMqttClientOptions ManagedOptions;
/// <summary>
/// 连接到MQTT服务器
/// </summary>
public async Task ConnectAsync()
{
if (_mqttClient == null)
{
InitializeClient();
}
if (!_mqttClient.IsConnected)
{
Log("连接到MQTT服务器: " + brokerAddress + ":" + brokerPort);
await _mqttClient.StartAsync(ManagedOptions);
}
else
{
Log("已经连接到MQTT服务器");
}
}
/// <summary>
/// 从MQTT服务器断开连接
/// </summary>
public async Task DisconnectAsync()
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
Log("正在断开与MQTT服务器的连接");
await _mqttClient.StopAsync();
}
}
/// <summary>
/// 发布消息到指定主题
/// </summary>
/// <param name="topic">主题</param>
/// <param name="payload">消息内容</param>
/// <param name="qosLevel">服务质量等级</param>
/// <param name="retain">是否保留消息</param>
public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false)
{
if (!IsConnected)
{
Log("无法发布消息 - 未连接到服务器");
return;
}
if (string.IsNullOrEmpty(topic))
{
Log("主题不能为空");
return;
}
try
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qosLevel)
.WithRetainFlag(retain)
.Build();
await _mqttClient.EnqueueAsync(message);
Log($"已发布消息到主题: {topic} - 内容: {payload}");
}
catch (Exception ex)
{
Log($"发布消息失败: {ex.Message}");
}
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic">要订阅的主题</param>
/// <param name="qosLevel">服务质量等级</param>
public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce)
{
if (!IsConnected)
{
Log("无法订阅主题 - 未连接到服务器");
return;
}
if (string.IsNullOrEmpty(topic))
{
Log("订阅的主题不能为空");
return;
}
try
{
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(qosLevel)
.Build();
// 将单个主题过滤器包装在列表中
await _mqttClient.SubscribeAsync(new List<MqttTopicFilter> { topicFilter });
Log($"已订阅主题: {topic}");
}
catch (Exception ex)
{
Log($"订阅主题失败: {ex.Message}");
}
}
/// <summary>
/// 取消订阅主题
/// </summary>
/// <param name="topic">要取消订阅的主题</param>
public async Task UnsubscribeAsync(string topic)
{
if (!IsConnected)
{
Log("无法取消订阅 - 未连接到服务器");
return;
}
if (string.IsNullOrEmpty(topic))
{
Log("取消订阅的主题不能为空");
return;
}
try
{
await _mqttClient.UnsubscribeAsync(topic);
Log($"已取消订阅主题: {topic}");
}
catch (Exception ex)
{
Log($"取消订阅失败: {ex.Message}");
}
}
/// <summary>
/// 订阅多个主题
/// </summary>
public async Task SubscribeMultipleAsync(Dictionary<string, MqttQualityOfServiceLevel> topicsWithQos)
{
if (!IsConnected)
{
Log("无法订阅主题 - 未连接到服务器");
return;
}
if (topicsWithQos == null || topicsWithQos.Count == 0)
{
Log("订阅的主题列表不能为空");
return;
}
try
{
var topicFilters = new List<MqttTopicFilter>();
foreach (var topic in topicsWithQos)
{
var topicFilter = new MqttTopicFilterBuilder()
.WithTopic(topic.Key)
.WithQualityOfServiceLevel(topic.Value)
.Build();
topicFilters.Add(topicFilter);
}
await _mqttClient.SubscribeAsync(topicFilters);
Log($"已订阅 {topicFilters.Count} 个主题");
}
catch (Exception ex)
{
Log($"批量订阅主题失败: {ex.Message}");
}
}
/// <summary>
/// 取消多个订阅
/// </summary>
/// <param name="topics"></param>
/// <returns></returns>
public async Task UnsubscribeMultipleAsync(IEnumerable<string> topics)
{
if (!IsConnected)
{
Log("无法取消订阅 - 未连接到服务器");
return;
}
if (topics == null)
{
Log("取消订阅的主题列表不能为null");
return;
}
try
{
var topicList = topics.ToList();
if (topicList.Count == 0)
{
Log("取消订阅的主题列表不能为空");
return;
}
await _mqttClient.UnsubscribeAsync(topicList);
Log($"已取消订阅 {topicList.Count} 个主题");
}
catch (Exception ex)
{
Log($"批量取消订阅失败: {ex.Message}");
}
}
/// <summary>
/// 处理接收到的消息队列(在主线程中执行)
/// </summary>
private void ProcessMessageQueue()
{
while (_messageQueue.Count > 0)
{
var messageArgs = _messageQueue.Dequeue();
string topic = messageArgs.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(messageArgs.ApplicationMessage.PayloadSegment.ToArray());
Log($"收到消息 - 主题: {topic}, 内容: {payload}");
// 触发消息接收事件
OnMessageReceived?.Invoke(topic, payload);
}
}
/// <summary>
/// 连接成功事件
/// </summary>
private Task OnConnectedAsync(MqttClientConnectedEventArgs args)
{
Log("已连接到MQTT服务器");
OnConnected?.Invoke();
return Task.CompletedTask;
}
/// <summary>
/// 断开连接事件
/// </summary>
private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args)
{
Log("已断开连接");
OnDisconnected?.Invoke();
return Task.CompletedTask;
}
/// <summary>
/// 接收消息事件(在后台线程)
/// </summary>
private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
{
// 将消息加入队列,在主线程处理
lock (_messageQueue)
{
_messageQueue.Enqueue(args);
}
return Task.CompletedTask;
}
/// <summary>
/// 连接失败事件
/// </summary>
private Task OnConnectingFailedAsync(ConnectingFailedEventArgs args)
{
Log($"连接失败: {args.Exception.Message},将在 {reconnectInterval} 秒后重试");
return Task.CompletedTask;
}
/// <summary>
/// 日志输出
/// </summary>
private void Log(string message)
{
Debug.Log($"[MQTTClient] {message}");
OnLog?.Invoke(message);
}
}
使用案例
using UnityEngine;
using MQTTnet.Protocol;
using System;
public class MQTTTest : MonoBehaviour
{
public MQTTClient mqttClient;
public string testTopic = "sensor/room1/temperature";
private void Start()
{
if (mqttClient != null)
{
mqttClient.OnConnected += OnMqttConnected;
mqttClient.OnDisconnected += OnMqttDisconnected;
mqttClient.OnLog += OnMqttLog;
mqttClient.OnMessageReceived += OnMqttMessageReceived;
}
else
{
Debug.LogError("未找到MQTTClient组件");
}
}
[ContextMenu("connect")]
// 连接
public async void ConnectToBroker()
{
if (mqttClient != null && !mqttClient.IsConnected)
{
await mqttClient.ConnectAsync();
}
}
[ContextMenu("disconnect")]
// 断开连接
public async void DisconnectFromBroker()
{
if (mqttClient != null && mqttClient.IsConnected)
{
await mqttClient.DisconnectAsync();
}
}
[ContextMenu("subscribe")]
// 订阅主题
public async void SubscribeToTestTopic()
{
if (mqttClient != null && mqttClient.IsConnected)
{
await mqttClient.SubscribeAsync(testTopic, MqttQualityOfServiceLevel.AtLeastOnce);
}
else
{
Debug.LogWarning("请先连接到MQTT服务器");
}
}
[ContextMenu("publish")]
// 发布消息
public async void PublishTestMessage()
{
if (mqttClient != null && mqttClient.IsConnected)
{
string message = $"Unity测试消息: {DateTime.Now:yyyy-MM-dd HH:mm:ss}";
await mqttClient.PublishAsync(
testTopic,
message,
MqttQualityOfServiceLevel.AtLeastOnce,
false
);
}
else
{
Debug.LogWarning("请先连接到MQTT服务器");
}
}
[ContextMenu("unsubscribe")]
// 取消订阅
public async void UnsubscribeFromTestTopic()
{
if (mqttClient != null && mqttClient.IsConnected)
{
await mqttClient.UnsubscribeAsync(testTopic);
}
else
{
Debug.LogWarning("请先连接到MQTT服务器");
}
}
// MQTT连接成功回调
private void OnMqttConnected()
{
Debug.Log("MQTT连接成功,准备订阅主题...");
// 连接成功后可以自动订阅必要的主题
//SubscribeToTestTopic();
}
// MQTT断开连接回调
private void OnMqttDisconnected()
{
Debug.Log("MQTT已断开连接");
}
// MQTT日志回调
private void OnMqttLog(string message)
{
// 在这里处理日志
}
// 收到MQTT消息回调
private void OnMqttMessageReceived(string topic, string payload)
{
Debug.Log($"收到来自主题 {topic} 的消息: {payload}");
// TODO:在这里处理业务逻辑
}
private void OnDestroy()
{
// 移除事件监听,避免内存泄漏
if (mqttClient != null)
{
mqttClient.OnConnected -= OnMqttConnected;
mqttClient.OnDisconnected -= OnMqttDisconnected;
mqttClient.OnLog -= OnMqttLog;
mqttClient.OnMessageReceived -= OnMqttMessageReceived;
}
}
}
转载自CSDN-专业IT技术社区
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_44347839/article/details/150930079