关注

Unity MQTT通讯

首先明确概念,什么是MQTT?

MQTT是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的物联网(IoT)通信协议,在带宽有限、网络不稳定的环境下,实现低功耗、低延迟的设备间通信,是物联网领域的核心通信协议之一(被 OASIS 标准化,广泛支持于 AWS IoT、Azure IoT、阿里云 IoT 等平台)。

核心设计目标

MQTT 的所有特性均围绕 “适配物联网场景” 展开,核心目标可概括为 4 点:

轻量级
协议头部极小(固定头部仅 2 字节,可变头部和负载按需添加),设备侧客户端代码体积可压缩至几十 KB,适配单片机、传感器等资源受限设备(如 Arduino、ESP8266)。
低带宽 / 低功耗
减少数据传输量(无冗余字段),支持 “休眠 - 唤醒” 模式,适合 2G/4G/NB-IoT 等窄带或按流量计费的网络。
高可靠性
通过 “服务质量(QoS)” 机制保障消息不丢失、不重复,应对物联网常见的 “网络波动、设备离线” 场景。
解耦通信
基于 “发布 / 订阅” 模式,设备(发布者)和平台(订阅者)无需直接交互,甚至无需知道对方的 IP / 端口,降低系统耦合度。

核心架构:发布 / 订阅模式

MQTT 不采用 “点对点(P2P)” 直接通信,而是通过中间代理(Broker) 实现消息转发,架构包含 3 个核心角色

角色功能典型示例
发布者(Publisher)发送消息的设备 / 应用,无需知道谁会接收消息,仅需指定消息的 “主题(Topic)”。温湿度传感器、智能门锁
订阅者(Subscriber)接收消息的设备 / 应用,需提前 “订阅” 感兴趣的 “主题(Topic)”,仅接收对应消息。物联网平台、手机 APP、PLC
代理(Broker)核心中间件,负责接收发布者的消息,根据 “主题” 匹配订阅者,并将消息转发给订阅者;同时管理设备连接、会话、QoS 等逻辑。EMQX、Mosquitto、AWS IoT Core

关键概念:主题(Topic)

“主题” 是 MQTT 消息的 “分类标签”,类似文件系统的路径(用/分隔层级),用于实现消息的精准路由。例如:

传感器主题:sensor/room1/temperature(1 号房间温度传感器)、sensor/room2/humidity(2 号房间湿度传感器);
设备控制主题:device/light/room1/control(控制 1 号房间灯光)。

这里借助的MQTT的插件是MQTTnet+拓展MQTTnet.Extensions.ManagedClient,这个拓展提供了一个托管的 MQTT 客户端实现,简化了 MQTT 客户端的使用并增强了可靠性。它自动处理连接管理、重连逻辑、消息队列等功能,非常适合需要稳定 MQTT 通信的应用程序。
https://download.csdn.net/download/weixin_44347839/91777855

using UnityEngine;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using MQTTnet.Protocol;
using MQTTnet.Packets;
using System.Linq;
using MyFrameworkPro;
public class MQTTClient : MonoSingleton<MQTTClient>
{
    [Header("MQTT Broker Settings")]
    [Tooltip("MQTT服务器地址")]
    public string brokerAddress = "test.mosquitto.org";

    [Tooltip("MQTT服务器端口")]
    public int brokerPort = 1883;

    [Tooltip("客户端ID,为空则自动生成")]
    public string clientId = "";

    [Tooltip("用户名,无则留空")]
    public string username = "";

    [Tooltip("密码,无则留空")]
    public string password = "";

    [Header("Connection Settings")]
    [Tooltip("是否使用TLS加密连接")]
    public bool useTls = false;

    [Tooltip("自动重连间隔(秒)")]
    public int reconnectInterval = 5;

    [Tooltip("心跳包间隔(秒)")]
    private int KeepAlive = 60;
    // 管理型MQTT客户端
    private IManagedMqttClient _mqttClient;

    // 连接状态
    public bool IsConnected => _mqttClient?.IsConnected ?? false;

    // 消息队列,用于在主线程处理接收到的消息
    private readonly Queue<MqttApplicationMessageReceivedEventArgs> _messageQueue =
        new Queue<MqttApplicationMessageReceivedEventArgs>();

    // 事件定义
    public event Action OnConnected;
    public event Action OnDisconnected;
    public event Action<string> OnLog;
    public event Action<string, string> OnMessageReceived; // 主题, 消息内容

    private void Awake()
    {
        // 确保场景中只存在一个MQTT客户端实例
        if (FindObjectsOfType<MQTTClient>().Length > 1)
        {
            Destroy(gameObject);
        }
        else
        {
            DontDestroyOnLoad(gameObject);
        }
    }

    private void Start()
    {
        // 初始化客户端
        InitializeClient();
    }

    private void Update()
    {
        // 在主线程处理消息队列
        ProcessMessageQueue();
    }
    protected override void OnDestroy()
    {
        base.OnDestroy();
        // 断开连接并清理资源
        DisconnectAsync().ConfigureAwait(false);
        _mqttClient?.Dispose();
    }
    /// <summary>
    /// 初始化MQTT客户端
    /// </summary>
    public void InitializeClient()
    {
        if (_mqttClient != null)
        {
            _mqttClient.Dispose();
        }
        clientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString() : clientId;
        //配置连接参数
        Options = new MqttClientOptionsBuilder()
            .WithTcpServer(brokerAddress, brokerPort)
            .WithClientId(clientId)
            .WithCredentials(username, password)
            .WithCleanSession()
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepAlive))
            .Build();

        //配置重连机制
        ManagedOptions = new ManagedMqttClientOptionsBuilder()
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .WithClientOptions(Options)
            .Build();
        // 创建管理型客户端
        _mqttClient = new MqttFactory().CreateManagedMqttClient();

        // 注册事件
        _mqttClient.ConnectedAsync += OnConnectedAsync;
        _mqttClient.DisconnectedAsync += OnDisconnectedAsync;
        _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
        _mqttClient.ConnectingFailedAsync += OnConnectingFailedAsync;
    }
    private MqttClientOptions Options;
    private ManagedMqttClientOptions ManagedOptions;
    /// <summary>
    /// 连接到MQTT服务器
    /// </summary>
    public async Task ConnectAsync()
    {
        if (_mqttClient == null)
        {
            InitializeClient();
        }

        if (!_mqttClient.IsConnected)
        {
            Log("连接到MQTT服务器: " + brokerAddress + ":" + brokerPort);
            await _mqttClient.StartAsync(ManagedOptions);
        }
        else
        {
            Log("已经连接到MQTT服务器");
        }
    }

    /// <summary>
    /// 从MQTT服务器断开连接
    /// </summary>
    public async Task DisconnectAsync()
    {
        if (_mqttClient != null && _mqttClient.IsConnected)
        {
            Log("正在断开与MQTT服务器的连接");
            await _mqttClient.StopAsync();
        }
    }

    /// <summary>
    /// 发布消息到指定主题
    /// </summary>
    /// <param name="topic">主题</param>
    /// <param name="payload">消息内容</param>
    /// <param name="qosLevel">服务质量等级</param>
    /// <param name="retain">是否保留消息</param>
    public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false)
    {
        if (!IsConnected)
        {
            Log("无法发布消息 - 未连接到服务器");
            return;
        }

        if (string.IsNullOrEmpty(topic))
        {
            Log("主题不能为空");
            return;
        }

        try
        {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(qosLevel)
                .WithRetainFlag(retain)
                .Build();

            await _mqttClient.EnqueueAsync(message);
            Log($"已发布消息到主题: {topic} - 内容: {payload}");
        }
        catch (Exception ex)
        {
            Log($"发布消息失败: {ex.Message}");
        }
    }


    /// <summary>
    /// 订阅主题
    /// </summary>
    /// <param name="topic">要订阅的主题</param>
    /// <param name="qosLevel">服务质量等级</param>
    public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce)
    {
        if (!IsConnected)
        {
            Log("无法订阅主题 - 未连接到服务器");
            return;
        }

        if (string.IsNullOrEmpty(topic))
        {
            Log("订阅的主题不能为空");
            return;
        }

        try
        {
            var topicFilter = new MqttTopicFilterBuilder()
                .WithTopic(topic)
                .WithQualityOfServiceLevel(qosLevel)
                .Build();

            // 将单个主题过滤器包装在列表中
            await _mqttClient.SubscribeAsync(new List<MqttTopicFilter> { topicFilter });
            Log($"已订阅主题: {topic}");
        }
        catch (Exception ex)
        {
            Log($"订阅主题失败: {ex.Message}");
        }
    }



    /// <summary>
    /// 取消订阅主题
    /// </summary>
    /// <param name="topic">要取消订阅的主题</param>
    public async Task UnsubscribeAsync(string topic)
    {
        if (!IsConnected)
        {
            Log("无法取消订阅 - 未连接到服务器");
            return;
        }

        if (string.IsNullOrEmpty(topic))
        {
            Log("取消订阅的主题不能为空");
            return;
        }

        try
        {
            await _mqttClient.UnsubscribeAsync(topic);
            Log($"已取消订阅主题: {topic}");
        }
        catch (Exception ex)
        {
            Log($"取消订阅失败: {ex.Message}");
        }
    }

    /// <summary>
    /// 订阅多个主题
    /// </summary>
    public async Task SubscribeMultipleAsync(Dictionary<string, MqttQualityOfServiceLevel> topicsWithQos)
    {
        if (!IsConnected)
        {
            Log("无法订阅主题 - 未连接到服务器");
            return;
        }

        if (topicsWithQos == null || topicsWithQos.Count == 0)
        {
            Log("订阅的主题列表不能为空");
            return;
        }

        try
        {
            var topicFilters = new List<MqttTopicFilter>();
            foreach (var topic in topicsWithQos)
            {
                var topicFilter = new MqttTopicFilterBuilder()
                    .WithTopic(topic.Key)
                    .WithQualityOfServiceLevel(topic.Value)
                    .Build();
                topicFilters.Add(topicFilter);
            }

            await _mqttClient.SubscribeAsync(topicFilters);
            Log($"已订阅 {topicFilters.Count} 个主题");
        }
        catch (Exception ex)
        {
            Log($"批量订阅主题失败: {ex.Message}");
        }
    }
    /// <summary>
    /// 取消多个订阅
    /// </summary>
    /// <param name="topics"></param>
    /// <returns></returns>
    public async Task UnsubscribeMultipleAsync(IEnumerable<string> topics)
    {
        if (!IsConnected)
        {
            Log("无法取消订阅 - 未连接到服务器");
            return;
        }

        if (topics == null)
        {
            Log("取消订阅的主题列表不能为null");
            return;
        }

        try
        {
            var topicList = topics.ToList();
            if (topicList.Count == 0)
            {
                Log("取消订阅的主题列表不能为空");
                return;
            }

            await _mqttClient.UnsubscribeAsync(topicList);
            Log($"已取消订阅 {topicList.Count} 个主题");
        }
        catch (Exception ex)
        {
            Log($"批量取消订阅失败: {ex.Message}");
        }
    }
    /// <summary>
    /// 处理接收到的消息队列(在主线程中执行)
    /// </summary>
    private void ProcessMessageQueue()
    {
        while (_messageQueue.Count > 0)
        {
            var messageArgs = _messageQueue.Dequeue();
            string topic = messageArgs.ApplicationMessage.Topic;
            string payload = Encoding.UTF8.GetString(messageArgs.ApplicationMessage.PayloadSegment.ToArray());

            Log($"收到消息 - 主题: {topic}, 内容: {payload}");

            // 触发消息接收事件
            OnMessageReceived?.Invoke(topic, payload);
        }
    }

    /// <summary>
    /// 连接成功事件
    /// </summary>
    private Task OnConnectedAsync(MqttClientConnectedEventArgs args)
    {
        Log("已连接到MQTT服务器");
        OnConnected?.Invoke();
        return Task.CompletedTask;
    }

    /// <summary>
    /// 断开连接事件
    /// </summary>
    private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args)
    {
        Log("已断开连接");
        OnDisconnected?.Invoke();
        return Task.CompletedTask;
    }

    /// <summary>
    /// 接收消息事件(在后台线程)
    /// </summary>
    private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
    {
        // 将消息加入队列,在主线程处理
        lock (_messageQueue)
        {
            _messageQueue.Enqueue(args);
        }
        return Task.CompletedTask;
    }

    /// <summary>
    /// 连接失败事件
    /// </summary>
    private Task OnConnectingFailedAsync(ConnectingFailedEventArgs args)
    {
        Log($"连接失败: {args.Exception.Message},将在 {reconnectInterval} 秒后重试");
        return Task.CompletedTask;
    }

    /// <summary>
    /// 日志输出
    /// </summary>
    private void Log(string message)
    {
        Debug.Log($"[MQTTClient] {message}");
        OnLog?.Invoke(message);
    }
}

使用案例

using UnityEngine;
using MQTTnet.Protocol;
using System;

public class MQTTTest : MonoBehaviour
{
    public MQTTClient mqttClient;

    public string testTopic = "sensor/room1/temperature";

    private void Start()
    {
        if (mqttClient != null)
        {
            mqttClient.OnConnected += OnMqttConnected;
            mqttClient.OnDisconnected += OnMqttDisconnected;
            mqttClient.OnLog += OnMqttLog;
            mqttClient.OnMessageReceived += OnMqttMessageReceived;
        }
        else
        {
            Debug.LogError("未找到MQTTClient组件");
        }
    }
    [ContextMenu("connect")]
    // 连接
    public async void ConnectToBroker()
    {
        if (mqttClient != null && !mqttClient.IsConnected)
        {
            await mqttClient.ConnectAsync();
        }
    }
    [ContextMenu("disconnect")]
    // 断开连接
    public async void DisconnectFromBroker()
    {
        if (mqttClient != null && mqttClient.IsConnected)
        {
            await mqttClient.DisconnectAsync();
        }
    }
    [ContextMenu("subscribe")]
    // 订阅主题
    public async void SubscribeToTestTopic()
    {
        if (mqttClient != null && mqttClient.IsConnected)
        {
            await mqttClient.SubscribeAsync(testTopic, MqttQualityOfServiceLevel.AtLeastOnce);
        }
        else
        {
            Debug.LogWarning("请先连接到MQTT服务器");
        }
    }
    [ContextMenu("publish")]
    // 发布消息
    public async void PublishTestMessage()
    {
        if (mqttClient != null && mqttClient.IsConnected)
        {
            string message = $"Unity测试消息: {DateTime.Now:yyyy-MM-dd HH:mm:ss}";
            await mqttClient.PublishAsync(
                testTopic,
                message,
                MqttQualityOfServiceLevel.AtLeastOnce,
                false
            );
        }
        else
        {
            Debug.LogWarning("请先连接到MQTT服务器");
        }
    }
    [ContextMenu("unsubscribe")]
    // 取消订阅
    public async void UnsubscribeFromTestTopic()
    {
        if (mqttClient != null && mqttClient.IsConnected)
        {
            await mqttClient.UnsubscribeAsync(testTopic);
        }
        else
        {
            Debug.LogWarning("请先连接到MQTT服务器");
        }
    }

    // MQTT连接成功回调
    private void OnMqttConnected()
    {
        Debug.Log("MQTT连接成功,准备订阅主题...");
        // 连接成功后可以自动订阅必要的主题
        //SubscribeToTestTopic();
    }

    // MQTT断开连接回调
    private void OnMqttDisconnected()
    {
        Debug.Log("MQTT已断开连接");
    }

    // MQTT日志回调
    private void OnMqttLog(string message)
    {
        // 在这里处理日志
    }

    // 收到MQTT消息回调
    private void OnMqttMessageReceived(string topic, string payload)
    {
        Debug.Log($"收到来自主题 {topic} 的消息: {payload}");

        // TODO:在这里处理业务逻辑
    }

    private void OnDestroy()
    {
        // 移除事件监听,避免内存泄漏
        if (mqttClient != null)
        {
            mqttClient.OnConnected -= OnMqttConnected;
            mqttClient.OnDisconnected -= OnMqttDisconnected;
            mqttClient.OnLog -= OnMqttLog;
            mqttClient.OnMessageReceived -= OnMqttMessageReceived;
        }
    }
}

转载自CSDN-专业IT技术社区

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/weixin_44347839/article/details/150930079

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--