|
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Client.Options;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using HBLConsole.GVL;
- using HBLConsole.Service;
-
- namespace HBLConsole.Communication
- {
- public class MqttHelper
- {
-
- private volatile static MqttHelper _Instance;
- public static MqttHelper GetInstance => _Instance ?? (_Instance = new MqttHelper());
- private MqttHelper() { }
-
- private IMqttClient client;
- IMqttClientOptions options;
-
- /// <summary>
- /// MQTT 接收消息
- /// </summary>
- public Action<MqttApplicationMessageReceivedEventArgs> MqttReceive { get; set; }
-
- /// <summary>
- /// MQTT 连接成功
- /// </summary>
- public Action ConnectOk { get; set; }
-
- /// <summary>
- /// 重连成功
- /// </summary>
- public Action Reconnection { get; set; }
-
- Action UseDisconnectedAction;
-
-
- public async void MqttInitAsync(string UserName, string pass, string IP, int port, string clientID)
- {
- options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build();
- client = new MqttFactory().CreateMqttClient();
- client.UseDisconnectedHandler(c =>
- {
- if (UseDisconnectedAction == null)
- {
- Reconnect();
- UseDisconnectedAction();
- }
-
- }).UseApplicationMessageReceivedHandler(c =>
- {
- MqttReceive(c);
- }).UseConnectedHandler((e) =>
- {
- MessageLog.GetInstance.Show($"连接成功");
- });
-
- try
- {
- await client.ConnectAsync(options);
- }
- catch (Exception ex)
- {
- MessageLog.GetInstance.Show(ex.Message);
- MessageLog.GetInstance.Show("mqtt连接失败!重连执行中");
- }
-
- if (client.IsConnected)
- {
- MessageLog.GetInstance.Show("MQTT连接成功!");
- if (ConnectOk != null) ConnectOk();
- }
-
- }
-
- private void Reconnect()
- {
- UseDisconnectedAction = new Action(() =>
- {
- Thread.Sleep(2000);
- while (!InternetInfo.NetworkConnectState)
- {
- Thread.Sleep(2000);
- }
- MessageLog.GetInstance.Show($"断开连接");
- bool ErrorFlag = false;
- while (!client.IsConnected)
- {
- try
- {
- MessageLog.GetInstance.Show($"重连中");
- client.ConnectAsync(options).Wait();
- }
- catch (Exception ex)
- {
- if (!ErrorFlag)
- {
- MessageLog.GetInstance.Show(ex.ToString());
- ErrorFlag = true;
- }
- }
- Thread.Sleep(3000);
- }
-
- if (client.IsConnected)
- {
- MessageLog.GetInstance.Show("MQTT重连成功!");
- if (Reconnection != null) Reconnection();
- }
- UseDisconnectedAction = null;
- });
- }
-
- /// <summary>
- /// Mqtt 订阅
- /// </summary>
- /// <param name="topic">需要订阅的主题</param>
- public async void MqttSubscriptionAsync(string topic)
- {
- if (client != null)
- {
- if (client.IsConnected)
- {
- try
- {
- var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
- }
- catch { }
- }
- }
- }
-
- /// <summary>
- /// Mqtt 订阅
- /// </summary>
- /// <param name="topic">需要订阅的主题</param>
- public async void MqttSubscriptionAsync(string[] topic)
- {
- if (client != null)
- {
- if (client.IsConnected)
- {
- try
- {
- foreach (var item in topic)
- {
- var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build());
- }
-
- }
- catch { }
- }
- }
- }
-
- /// <summary>
- /// Mqtt 发布
- /// </summary>
- /// <param name="topic">需要发布的主题</param>
- /// <param name="content">需要发布的内容</param>
- public async void MqttPublishAsync(string topic, string content)
- {
- if (client != null)
- {
- if (client.IsConnected)
- {
-
- var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
- try
- {
- var result = await client.PublishAsync(msg);
- }
- catch { }
- }
- }
- }
-
-
-
-
- }
- }
|