Net6 基于MQTTnet 实现 MQTT服务

8/5/2022 4:13:05 PM
2330
0

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。

MQTT 的工作原理

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

关于 MQTT Broker 的更多详情,请参阅文章 2023 年最全面的 MQTT Broker 比较指南

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。


MQTT 发布-订阅模式

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:

chat/room/1

sensor/10/temperature

sensor/+/temperature

MQTT 主题支持以下两种通配符:+ 和 #。

  • +:表示单层通配符,例如 a/+ 匹配 a/x 或 a/y。
  • #:表示多层通配符,例如 a/# 匹配 a/x、a/b/c/d。

注意:通配符主题只能用于订阅,不能用于发布。

关于 MQTT 主题的更多详情,请参阅文章通过案例理解 MQTT 主题与通配符

QoS

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

关于 MQTT QoS 的更多详情,请参阅文章 MQTT QoS 0, 1, 2 介绍

MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

开始使用 MQTT:快速教程

下面我们将通过一些简单的示例来展示如何使用 MQTT。在开始之前,需要准备 MQTT Broker 和 MQTT 客户端。

 

使用vs2022  netcore  d的webapi模板创建一个net6项目,并使用nuget 添加引用 MQTTnet.AspNetCore 

添加配置,可选

 "MqttOption": {
    "Host": "127.0.0.1",
    "Port": "61613",
    "Timeout": "5000",
    "UserName": "admin",
    "Password": "123456"
  }

创建配置项类

  /// <summary>
    /// mqtt 服务器配置项
    /// </summary>
    public class MqttOption
    {
        public string Host { get; set; } = "";
        public int  Port { get; set; }= 61613;
        public int Timeout { get; set; } = 5000;
        public string UserName { get; set; } = "";
        public string Password { get; set; } = "";
        public int MaxConnect { get; set; } = 100;
    }

创建一个server,用于处理和发送mqtt 消息

public class MqttService
    {

        /// <summary>
        ///  mqttnet  Server
        /// </summary>
        public static MqttServer MqttNetServer { get; set; }


        public static void PublishData(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_01",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };

            MqttNetServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
            {
                SenderClientId = "Server_01"
            }).GetAwaiter().GetResult();
        }
    }

 

我们创建一个 IHostedService  服务,用于随服务自动启动,用于接收mqtt客户端连接和断开的处理

  public class MqttServiceHost : IHostedService, IDisposable
    {
        public readonly  MqttService _mqttServer;
        private readonly IHostApplicationLifetime _appLifetime;
        private readonly ILogger _logger;
        private readonly MqttOption _mqttOption;

        const string ServerClientId = "SERVER";



        public MqttServiceHost(MqttService mqttServer, ILogger<MqttServiceHost> logger, IHostApplicationLifetime appLifetime, IOptions<MqttOption> mqttOptions)
        {
            _mqttServer = mqttServer;
            _logger = logger;
            _appLifetime = appLifetime;
            _mqttOption = mqttOptions.Value;
        }

        public void Dispose()
        {
            throw new NotImplementedException();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {

            MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
            optionsBuilder.WithDefaultEndpoint();
            optionsBuilder.WithDefaultEndpointPort(_mqttOption.Port); // 设置 服务端 端口号
            optionsBuilder.WithConnectionBacklog(_mqttOption.MaxConnect); // 最大连接数
            optionsBuilder.WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(_mqttOption.Timeout));

            MqttServerOptions options = optionsBuilder.Build();

            MqttService.MqttNetServer = new MqttFactory().CreateMqttServer(options);

            MqttService.MqttNetServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
            MqttService.MqttNetServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
            MqttService.MqttNetServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件

            MqttService.MqttNetServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
            MqttService.MqttNetServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
            MqttService.MqttNetServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
            MqttService.MqttNetServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
            MqttService.MqttNetServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
            MqttService.MqttNetServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关

            MqttService.MqttNetServer.StartAsync();

            _appLifetime.ApplicationStarted.Register(OnStarted);
            _appLifetime.ApplicationStopping.Register(OnStoping);
            _appLifetime.ApplicationStopped.Register(OnStopped);
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }

        //系统启动后
        private void OnStarted()
        {
            _logger.LogInformation("OnStarted has been called.");
        }

        //系统结束前
        private void OnStoping()
        {
            _logger.LogInformation("OnStoping has been called.");
            MqttService.MqttNetServer.StopAsync();      //mysql服务停止
        }

        //系统结束后
        private void OnStopped()
        {
            _logger.LogInformation("OnStopped has been called.");
        }

        /// <summary>
        /// 客户端订阅主题事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StoppedAsync(EventArgs arg)
        {
            Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 用户名和密码验证有关
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            arg.ReasonCode = MqttConnectReasonCode.Success;
            if ((arg.Username ?? string.Empty) != _mqttOption.UserName || (arg.Password ?? String.Empty) != _mqttOption.Password)
            {
                arg.ReasonCode = MqttConnectReasonCode.Banned;
                Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");

            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            if (string.Equals(arg.ClientId, ServerClientId))
            {
                return Task.CompletedTask;
            }

            Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 启动后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StartedAsync(EventArgs arg)
        {
            Console.WriteLine($"StartedAsync:MQTT服务已启动……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端取消订阅事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】  ");
            return Task.CompletedTask;
        }

        private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端断开时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端连接时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }
    }

在 Program.cs中注册服务

 //MqttOption选项的绑定
builder.Services.Configure<MqttOption>(builder.Configuration.GetSection("MqttOption"));
builder.Services.AddHostedService<MqttServiceHost>();     //mqtt服务

builder.Services.AddSingleton<MqttService>();

创建一个用于给客户端发送消息的 api

  public class MqttClientTestController:Controller
  {
        

        public IActionResult PushMsg(string msg)
        {

            MqttService.PublishData(msg);
            return Content("");
        }
  }

 

使用客户端工具,添加连接,添加订阅频道。这里我们使用了MQTTX 客户端,请自行下载

并添加订阅,订阅名字我们写死在代码中了,请使用代码中的订阅名

我们使用发消息的api,对这个客户端进行发送,因为频道已经写死在代码中了,所以直接发送消息即可

测试效果,如下图,并且测试了对客户端消息的响应

 

原码下载:下载

全部评论



提问