MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。
要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。
MQTT 客户端
任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。
MQTT Broker
MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。
关于 MQTT Broker 的更多详情,请参阅文章 2023 年最全面的 MQTT Broker 比较指南。
发布-订阅模式
发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。
下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。
主题
MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:
chat/room/1
sensor/10/temperature
sensor/+/temperature
MQTT 主题支持以下两种通配符:+ 和 #。
注意:通配符主题只能用于订阅,不能用于发布。
关于 MQTT 主题的更多详情,请参阅文章通过案例理解 MQTT 主题与通配符。
QoS
MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。
关于 MQTT QoS 的更多详情,请参阅文章 MQTT QoS 0, 1, 2 介绍。
在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:
下面我们将通过一些简单的示例来展示如何使用 MQTT。在开始之前,需要准备 MQTT Broker 和 MQTT 客户端。
使用vs2022 netcore d的webapi模板创建一个net6项目,并使用nuget 添加引用 MQTTnet.AspNetCore
添加配置,可选
"MqttOption": {
"Host": "127.0.0.1",
"Port": "61613",
"Timeout": "5000",
"UserName": "admin",
"Password": "123456"
}
创建配置项类
/// <summary>
/// mqtt 服务器配置项
/// </summary>
public class MqttOption
{
public string Host { get; set; } = "";
public int Port { get; set; }= 61613;
public int Timeout { get; set; } = 5000;
public string UserName { get; set; } = "";
public string Password { get; set; } = "";
public int MaxConnect { get; set; } = 100;
}
创建一个server,用于处理和发送mqtt 消息
public class MqttService
{
/// <summary>
/// mqttnet Server
/// </summary>
public static MqttServer MqttNetServer { get; set; }
public static void PublishData(string data)
{
var message = new MqttApplicationMessage
{
Topic = "topic_01",
Payload = Encoding.Default.GetBytes(data),
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
Retain = true // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
};
MqttNetServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
{
SenderClientId = "Server_01"
}).GetAwaiter().GetResult();
}
}
我们创建一个 IHostedService 服务,用于随服务自动启动,用于接收mqtt客户端连接和断开的处理
public class MqttServiceHost : IHostedService, IDisposable
{
public readonly MqttService _mqttServer;
private readonly IHostApplicationLifetime _appLifetime;
private readonly ILogger _logger;
private readonly MqttOption _mqttOption;
const string ServerClientId = "SERVER";
public MqttServiceHost(MqttService mqttServer, ILogger<MqttServiceHost> logger, IHostApplicationLifetime appLifetime, IOptions<MqttOption> mqttOptions)
{
_mqttServer = mqttServer;
_logger = logger;
_appLifetime = appLifetime;
_mqttOption = mqttOptions.Value;
}
public void Dispose()
{
throw new NotImplementedException();
}
public Task StartAsync(CancellationToken cancellationToken)
{
MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
optionsBuilder.WithDefaultEndpoint();
optionsBuilder.WithDefaultEndpointPort(_mqttOption.Port); // 设置 服务端 端口号
optionsBuilder.WithConnectionBacklog(_mqttOption.MaxConnect); // 最大连接数
optionsBuilder.WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(_mqttOption.Timeout));
MqttServerOptions options = optionsBuilder.Build();
MqttService.MqttNetServer = new MqttFactory().CreateMqttServer(options);
MqttService.MqttNetServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
MqttService.MqttNetServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
MqttService.MqttNetServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件
MqttService.MqttNetServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
MqttService.MqttNetServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
MqttService.MqttNetServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
MqttService.MqttNetServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
MqttService.MqttNetServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
MqttService.MqttNetServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关
MqttService.MqttNetServer.StartAsync();
_appLifetime.ApplicationStarted.Register(OnStarted);
_appLifetime.ApplicationStopping.Register(OnStoping);
_appLifetime.ApplicationStopped.Register(OnStopped);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
//系统启动后
private void OnStarted()
{
_logger.LogInformation("OnStarted has been called.");
}
//系统结束前
private void OnStoping()
{
_logger.LogInformation("OnStoping has been called.");
MqttService.MqttNetServer.StopAsync(); //mysql服务停止
}
//系统结束后
private void OnStopped()
{
_logger.LogInformation("OnStopped has been called.");
}
/// <summary>
/// 客户端订阅主题事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 关闭后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StoppedAsync(EventArgs arg)
{
Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
return Task.CompletedTask;
}
/// <summary>
/// 用户名和密码验证有关
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
arg.ReasonCode = MqttConnectReasonCode.Success;
if ((arg.Username ?? string.Empty) != _mqttOption.UserName || (arg.Password ?? String.Empty) != _mqttOption.Password)
{
arg.ReasonCode = MqttConnectReasonCode.Banned;
Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
if (string.Equals(arg.ClientId, ServerClientId))
{
return Task.CompletedTask;
}
Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 启动后事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_StartedAsync(EventArgs arg)
{
Console.WriteLine($"StartedAsync:MQTT服务已启动……");
return Task.CompletedTask;
}
/// <summary>
/// 客户端取消订阅事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】 ");
return Task.CompletedTask;
}
private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
return Task.CompletedTask;
}
/// <summary>
/// 客户端断开时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接时候触发
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】 ");
return Task.CompletedTask;
}
}
在 Program.cs中注册服务
//MqttOption选项的绑定
builder.Services.Configure<MqttOption>(builder.Configuration.GetSection("MqttOption"));
builder.Services.AddHostedService<MqttServiceHost>(); //mqtt服务
builder.Services.AddSingleton<MqttService>();
创建一个用于给客户端发送消息的 api
public class MqttClientTestController:Controller
{
public IActionResult PushMsg(string msg)
{
MqttService.PublishData(msg);
return Content("");
}
}
使用客户端工具,添加连接,添加订阅频道。这里我们使用了MQTTX 客户端,请自行下载
并添加订阅,订阅名字我们写死在代码中了,请使用代码中的订阅名
我们使用发消息的api,对这个客户端进行发送,因为频道已经写死在代码中了,所以直接发送消息即可
测试效果,如下图,并且测试了对客户端消息的响应
原码下载:下载
本文链接:https://blog.nnwk.net/article/117
有问题请留言。版权所有,转载请在显眼位置处保留文章出处,并留下原文连接
Leave your question and I'll get back to you as soon as I see it. All rights reserved. Please keep the source and links
友情链接:
子卿全栈
全部评论