using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using HBLConsole.GVL;
using HBLConsole.Service;
namespace HBLConsole.Communication
{
public class MqttHelper
{
private volatile static MqttHelper _Instance;
public static MqttHelper GetInstance => _Instance ?? (_Instance = new MqttHelper());
private MqttHelper() { }
private IMqttClient client;
IMqttClientOptions options;
///
/// MQTT 接收消息
///
public Action MqttReceive { get; set; }
///
/// MQTT 连接成功
///
public Action ConnectOk { get; set; }
///
/// 重连成功
///
public Action Reconnection { get; set; }
Action UseDisconnectedAction;
public async void MqttInitAsync(string UserName, string pass, string IP, int port, string clientID)
{
options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build();
client = new MqttFactory().CreateMqttClient();
client.UseDisconnectedHandler(c =>
{
if (UseDisconnectedAction == null)
{
Reconnect();
UseDisconnectedAction();
}
}).UseApplicationMessageReceivedHandler(c =>
{
MqttReceive(c);
}).UseConnectedHandler((e) =>
{
MessageLog.GetInstance.Show($"连接成功");
});
try
{
await client.ConnectAsync(options);
}
catch (Exception ex)
{
MessageLog.GetInstance.ShowEx(ex.Message);
MessageLog.GetInstance.Show("mqtt连接失败!重连执行中");
}
if (client.IsConnected)
{
MessageLog.GetInstance.Show("MQTT连接成功!");
if (ConnectOk != null) ConnectOk();
}
}
private void Reconnect()
{
UseDisconnectedAction = new Action(() =>
{
Thread.Sleep(2000);
while (!InternetInfo.NetworkConnectState)
{
Thread.Sleep(2000);
}
MessageLog.GetInstance.Show($"断开连接");
bool ErrorFlag = false;
while (!client.IsConnected)
{
try
{
MessageLog.GetInstance.Show($"重连中");
client.ConnectAsync(options).Wait();
}
catch (Exception ex)
{
if (!ErrorFlag)
{
MessageLog.GetInstance.ShowEx(ex.ToString());
ErrorFlag = true;
}
}
Thread.Sleep(3000);
}
if (client.IsConnected)
{
MessageLog.GetInstance.Show("MQTT重连成功!");
if (Reconnection != null) Reconnection();
}
UseDisconnectedAction = null;
});
}
///
/// Mqtt 订阅
///
/// 需要订阅的主题
public async void MqttSubscriptionAsync(string topic)
{
if (client != null)
{
if (client.IsConnected)
{
try
{
var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
}
catch { }
}
}
}
///
/// Mqtt 订阅
///
/// 需要订阅的主题
public async void MqttSubscriptionAsync(string[] topic)
{
if (client != null)
{
if (client.IsConnected)
{
try
{
foreach (var item in topic)
{
var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build());
}
}
catch { }
}
}
}
///
/// Mqtt 发布
///
/// 需要发布的主题
/// 需要发布的内容
public async void MqttPublishAsync(string topic, string content)
{
if (client != null)
{
if (client.IsConnected)
{
var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
try
{
var result = await client.PublishAsync(msg);
}
catch { }
}
}
}
}
}