|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- //using BPA.MQTTClient;
- //using Microsoft.Extensions.Configuration;
- //using MQTTnet.Client;
- //using MQTTnet.Client;
- //using MQTTnet;
- //using MQTTnet.Client;
- //using MQTTnet.Client.Options;
- //using MQTTnet.Client.Receiving;
- using BPASmartClient.Helper;
- using BPASmartClient.Message;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Client.Options;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
-
- namespace BPASmartClient.MQTT
- {
- public class MQTTProxy
- {
- private IMqttClient client;
- IMqttClientOptions options;
- public Action<string> MessageRecive { get; set; }
- public Action Connected { get; set; }
- public Action LostConnect { get; set; }
-
- public bool IsConnected { get; set; }
-
- bool push = true;
-
- Action UseDisconnectedAction;
-
-
- public async void Connect(string UserName, string pass, string IP, int port, string clientID)
- {
- options = new MqttClientOptionsBuilder().WithTcpServer(IP, port).WithClientId(clientID).WithCredentials(UserName, pass).Build();
- client = new MqttFactory().CreateMqttClient();
- client.UseDisconnectedHandler(c =>
- {
- if (UseDisconnectedAction == null)
- {
- Reconnect();//注册UseDisconnectedAction委托
- UseDisconnectedAction();//执行委托
- }
-
- }).UseApplicationMessageReceivedHandler(c =>
- {
- MessageRecive?.Invoke(Encoding.UTF8.GetString(c.ApplicationMessage.Payload));
- }).UseConnectedHandler((e) =>
- {
- //MessageLog.GetInstance.Show($"连接成功");
- });
-
- try
- {
- await client.ConnectAsync(options);
- }
- catch (Exception ex)
- {
- MessageLog.GetInstance.ShowEx(ex.Message);
- MessageLog.GetInstance.Show("mqtt连接失败!重连执行中");
- }
-
- if (client.IsConnected)
- {
- MessageLog.GetInstance.Show("MQTT连接成功!");
- Connected?.Invoke();
- }
-
- }
-
- private void Reconnect()
- {
- UseDisconnectedAction = new Action(() =>
- {
- MessageLog.GetInstance.ShowEx("MQTT 断开连接");
- Thread.Sleep(2000);
- while (!UniversalHelper.GetInstance().GetNetworkState())
- {
- Thread.Sleep(2000);
- }
- bool ErrorFlag = false;
- while (!client.IsConnected)
- {
- try
- {
- MessageLog.GetInstance.Show($"重连中");
- client.ConnectAsync(options).Wait();
- }
- catch (Exception ex)
- {
- if (!ErrorFlag)
- {
- MessageLog.GetInstance.ShowEx(ex.ToString());
- ErrorFlag = true;
- }
- }
- Thread.Sleep(3000);
- }
-
- if (client.IsConnected)
- {
- MessageLog.GetInstance.Show("MQTT重连成功!");
- LostConnect?.Invoke();
- }
- UseDisconnectedAction = null;
- });
- }
-
- /// <summary>
- /// Mqtt 订阅
- /// </summary>
- /// <param name="topic">需要订阅的主题</param>
- public async void MqttSubscriptionAsync(string topic)
- {
- if (client != null && client.IsConnected)
- {
- try
- {
- var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).WithExactlyOnceQoS().Build());
- }
- catch { }
- }
- }
-
- /// <summary>
- /// Mqtt 订阅
- /// </summary>
- /// <param name="topic">需要订阅的主题</param>
- public async void Subscrib(params string[] topic)
- {
- if (client != null && client.IsConnected)
- {
- try
- {
- foreach (var item in topic)
- {
- var result = await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(item).WithExactlyOnceQoS().Build());
- }
- }
- catch { }
- }
- }
-
- /// <summary>
- /// Mqtt 发布
- /// </summary>
- /// <param name="topic">需要发布的主题</param>
- /// <param name="content">需要发布的内容</param>
- public async void Publish(string topic, string content)
- {
- ;
- if (client != null && client.IsConnected)
- {
- push = true;
- var msg = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(content).WithExactlyOnceQoS().Build();
- try
- {
- var result = await client.PublishAsync(msg);
- }
- catch { }
- }
- if(!client.IsConnected)
- {
- if(push)
- {
- MessageLog.GetInstance.ShowEx("MQTT未连接");
- push = false;
- }
-
- }
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- //public Action<string> MessageRecive { get; set; }
- //public Action Connected { get; set; }
- //public Action LostConnect { get; set; }
-
- //public bool IsConnected { get; set; }
-
- //private IMqttClient client;
-
- //public void Connect(string userName, string Password, string ip, int port, string clientId)
- //{
- // IConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
- // configurationBuilder.AddMqttClientHostedService(p =>
- // {
- // p.Server = ip;
- // p.Port = port;
- // //p.UserName = "rafiul";
- // //p.Password = "12345678";
- // p.UserName = userName;
- // p.Password = Password;
- // p.mqttClientConnectedHandlerDelegate = new MQTTnet.Client.Connecting.MqttClientConnectedHandlerDelegate(e =>
- // {
- // IsConnected = true;
- // Connected?.Invoke();
- // });
- // //p.mqttClientDisconnectedHandlerDelegate = new MQTTnet.Client.Disconnecting.MqttClientDisconnectedHandlerDelegate(e =>
- // //{
- // // IsConnected = false;
- // // LostConnect?.Invoke();
- // //});
- // p.ConnectedResult += (s, e) =>
- // {
- // client = e;
- // };
- // p.MqttApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
- // {
- // MessageRecive?.Invoke(Encoding.Default.GetString(e.ApplicationMessage.Payload));
- // });
- // });
- //}
-
- public void CloseConnect()
- {
- client.Dispose();
- }
-
- //public async void Publish(string topic, string content)
- //{
- // if (client.IsConnected)
- // await client.PublishAsync(topic, content);
- //}
-
- //public async void Subscrib(params string[] topics)
- //{
- // foreach (var topic in topics)
- // {
- // await client.SubscribeAsync(new MqttTopicFilter() { Topic = topic, QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce });
- // }
- //}
- }
- }
|