using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace BPASmartClient.Compiler { public class MQTT { //private volatile static MQTT _Instance; //public static MQTT GetInstance => _Instance ?? (_Instance = new MQTT()); //private MQTT() { } public IMqttClient client; IMqttClientOptions options; /// /// MQTT 接收消息 /// public Action MqttReceive { get; set; } /// /// MQTT 连接成功 /// public Action ConnectOk { get; set; } /// /// 重连成功 /// public Action Reconnection { get; set; } public async void MqttInitAsync(string UserName,string pass,string IP,int port,string clientID) { p1: options = new MqttClientOptionsBuilder().WithTcpServer(IP,port).WithClientId(clientID).WithCredentials(UserName,pass).Build(); client = new MqttFactory().CreateMqttClient(); client.UseDisconnectedHandler(async c => { Thread.Sleep(2000); //while (!Device.DataBus.内存数据缓存.DataBus.GetInstance().NetworkConnectState) //{ // Thread.Sleep(2000); //} //Device.DataBus.内存数据缓存.DataBus.GetInstance().MQTTConnectState = false; //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"断开连接"); try { //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"重连中"); await client.ConnectAsync(options); } catch (Exception ex) { //logHelper.GetLogConfigInstance().WriteLog(LogLevel.ERROR,ex.Message); } if (client.IsConnected) { //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"MQTT重连成功"); if (Reconnection != null) Reconnection(); } }).UseApplicationMessageReceivedHandler(c => { MqttReceive(c); }).UseConnectedHandler((e) => { //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"连接成功"); }); try { await client.ConnectAsync(options); } catch (Exception ex) { //logHelper.GetLogConfigInstance().WriteLog(LogLevel.ERROR,ex.Message); } if (!client.IsConnected) { Thread.Sleep(2000); //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"mqtt连接失败!重连执行中"); goto p1; } Thread.Sleep(2000); //logHelper.GetLogConfigInstance().WriteLog(LogLevel.INFO,$"MQTT连接成功!"); if (ConnectOk != null) ConnectOk(); } /// /// Mqtt 订阅 /// /// 需要订阅的主题 public async void MqttSubscriptionAsync(string topic) { if (client != null) { if (client.IsConnected) { try { var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build()); } catch { } } } } /// /// Mqtt 发布 /// /// 需要发布的主题 /// 需要发布的内容 public async void MqttPublishAsync(string topic,string content) { if (client != null) { if (client.IsConnected) { var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build(); try { var result = await client.PublishAsync(msg); } catch { } } } } private static readonly object sendMessageLock = new object(); } }