using BPASmartClient.Helper; using BPASmartClient.Message; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using System; using System.Threading; namespace HBLConsole.Communication { public class MqttHelper:Singleton { private IMqttClient client; IMqttClientOptions options; /// /// MQTT 接收消息 /// public Action MqttReceive { get; set; } /// /// MQTT 连接成功 /// public Action ConnectOk { get; set; } /// /// 重连成功 /// public Action Reconnection { get; set; } Action UseDisconnectedAction; public async void MqttInitAsync(string UserName, string pass, string IP, int port, string clientID) { options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build(); client = new MqttFactory().CreateMqttClient(); client.UseDisconnectedHandler(c => { if (UseDisconnectedAction == null) { Reconnect(); UseDisconnectedAction(); } }).UseApplicationMessageReceivedHandler(c => { MqttReceive(c); }).UseConnectedHandler((e) => { MessageLog.GetInstance.Show($"连接成功"); }); try { await client.ConnectAsync(options); } catch (Exception ex) { MessageLog.GetInstance.ShowEx(ex.Message); MessageLog.GetInstance.Show("mqtt连接失败!重连执行中"); } if (client.IsConnected) { MessageLog.GetInstance.Show("MQTT连接成功!"); if (ConnectOk != null) ConnectOk(); } } public async void Close() { await client.DisconnectAsync(); } private void Reconnect() { UseDisconnectedAction = new Action(() => { Thread.Sleep(2000); while (!UniversalHelper.GetInstance().GetNetworkState()) { Thread.Sleep(2000); } MessageLog.GetInstance.Show($"断开连接"); bool ErrorFlag = false; while (!client.IsConnected) { try { MessageLog.GetInstance.Show($"重连中"); client.ConnectAsync(options).Wait(); } catch (Exception ex) { if (!ErrorFlag) { MessageLog.GetInstance.ShowEx(ex.ToString()); ErrorFlag = true; } } Thread.Sleep(3000); } if (client.IsConnected) { MessageLog.GetInstance.Show("MQTT重连成功!"); if (Reconnection != null) Reconnection(); } UseDisconnectedAction = null; }); } /// /// Mqtt 订阅 /// /// 需要订阅的主题 public async void MqttSubscriptionAsync(string topic) { if (client != null) { if (client.IsConnected) { try { var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build()); } catch { } } } } /// /// Mqtt 订阅 /// /// 需要订阅的主题 public async void MqttSubscriptionAsync(string[] topic) { if (client != null) { if (client.IsConnected) { try { foreach (var item in topic) { var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build()); } } catch { } } } } /// /// Mqtt 发布 /// /// 需要发布的主题 /// 需要发布的内容 public async void MqttPublishAsync(string topic, string content) { if (client != null) { if (client.IsConnected) { var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build(); try { var result = await client.PublishAsync(msg); } catch { } } } } } }