Procházet zdrojové kódy

Merge pull request #66 from JTrotta/develop

Add support to not persistent Queue
release/3.x.x
Christian před 7 roky
committed by GitHub
rodič
revize
680bc4658f
23 změnil soubory, kde provedl 549 přidání a 43 odebrání
  1. +6
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
  2. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  3. +5
    -0
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  4. +7
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
  5. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  6. +5
    -0
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  7. +1
    -1
      MQTTnet.Core/Client/IMqttClient.cs
  8. +2
    -0
      MQTTnet.Core/Client/IMqttClientFactory.cs
  9. +9
    -0
      MQTTnet.Core/Client/IMqttClientManaged.cs
  10. +12
    -0
      MQTTnet.Core/Client/IMqttClientQueuedStorage.cs
  11. +1
    -1
      MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs
  12. +6
    -5
      MQTTnet.Core/Client/MqttClient.cs
  13. +163
    -0
      MQTTnet.Core/Client/MqttClientManaged .cs
  14. +12
    -0
      MQTTnet.Core/Client/MqttClientManagedOptions.cs
  15. +12
    -12
      MQTTnet.Core/Client/MqttClientOptions.cs
  16. +84
    -0
      MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs
  17. +24
    -2
      MQTTnet.Core/Client/MqttClientTcpOptions.cs
  18. +24
    -2
      MQTTnet.Core/Client/MqttClientWebSocketOptions.cs
  19. +0
    -8
      MQTTnet.Core/MQTTnet.Core.csproj
  20. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs
  21. +172
    -6
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  22. binární
     
  23. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 6
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs Zobrazit soubor

@@ -7,7 +7,7 @@ namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

@@ -21,6 +21,11 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs Zobrazit soubor

@@ -121,7 +121,7 @@ namespace MQTTnet.Implementations
return _options.TlsOptions.AllowUntrustedCertificates;
}

private static X509CertificateCollection LoadCertificates(MqttClientOptions options)
private static X509CertificateCollection LoadCertificates(IMqttClientOptions options)
{
var certificates = new X509CertificateCollection();
if (options.TlsOptions.Certificates == null)


+ 5
- 0
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs Zobrazit soubor

@@ -9,5 +9,10 @@ namespace MQTTnet
{
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 7
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs Zobrazit soubor

@@ -7,7 +7,7 @@ namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

@@ -21,6 +21,12 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}


throw new NotSupportedException();
}
}

+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs Zobrazit soubor

@@ -89,7 +89,7 @@ namespace MQTTnet.Implementations
RawReceiveStream = ReceiveStream;
}

private static Certificate LoadCertificate(MqttClientOptions options)
private static Certificate LoadCertificate(IMqttClientOptions options)
{
if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any())
{


+ 5
- 0
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs Zobrazit soubor

@@ -9,5 +9,10 @@ namespace MQTTnet
{
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 1
- 1
MQTTnet.Core/Client/IMqttClient.cs Zobrazit soubor

@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client
event EventHandler Connected;
event EventHandler Disconnected;

Task ConnectAsync(MqttClientOptions options);
Task ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync();

Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters);


+ 2
- 0
MQTTnet.Core/Client/IMqttClientFactory.cs Zobrazit soubor

@@ -3,5 +3,7 @@
public interface IMqttClientFactory
{
IMqttClient CreateMqttClient();

IMqttClientManaged CreateMqttManagedClient();
}
}

+ 9
- 0
MQTTnet.Core/Client/IMqttClientManaged.cs Zobrazit soubor

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientManaged : IMqttClient
{
//Task ConnectAsync(MqttClientManagedOptions options);
}
}

+ 12
- 0
MQTTnet.Core/Client/IMqttClientQueuedStorage.cs Zobrazit soubor

@@ -0,0 +1,12 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientQueuedStorage
{
Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages);

Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync();
}
}

+ 1
- 1
MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs Zobrazit soubor

@@ -4,6 +4,6 @@ namespace MQTTnet.Core.Client
{
public interface IMqttCommunicationAdapterFactory
{
IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options);
IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options);
}
}

+ 6
- 5
MQTTnet.Core/Client/MqttClient.cs Zobrazit soubor

@@ -18,11 +18,11 @@ namespace MQTTnet.Core.Client
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory;

private MqttClientOptions _options;
private IMqttClientOptions _options;
private bool _isReceivingPackets;
private int _latestPacketIdentifier;
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
internal CancellationTokenSource _cancellationTokenSource;
internal IMqttCommunicationAdapter _adapter;

public MqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
@@ -35,7 +35,8 @@ namespace MQTTnet.Core.Client

public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;

public async Task ConnectAsync(MqttClientOptions options)

public async Task ConnectAsync(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

@@ -344,7 +345,7 @@ namespace MQTTnet.Core.Client
return _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, packet);
}

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
internal async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout);
await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);


+ 163
- 0
MQTTnet.Core/Client/MqttClientManaged .cs Zobrazit soubor

@@ -0,0 +1,163 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Client
{
public class MqttClientManaged: IMqttClientManaged
{
private MqttClientManagedOptions _options;
private int _latestPacketIdentifier;
private readonly BlockingCollection<MqttApplicationMessage> _inflightQueue;
private bool _usePersistance = false;
private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager;
private readonly MqttClient _baseMqttClient;

public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
_baseMqttClient = new MqttClient(communicationChannelFactory);
_baseMqttClient.Connected += BaseMqttClient_Connected;
_baseMqttClient.Disconnected += BaseMqttClient_Disconnected;
_baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived;
_inflightQueue = new BlockingCollection<MqttApplicationMessage>();
}

private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}

private void BaseMqttClient_Disconnected(object sender, EventArgs e)
{
Disconnected?.Invoke(this, e);
}

private void BaseMqttClient_Connected(object sender, EventArgs e)
{
Connected?.Invoke(this, e);
}

public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected => _baseMqttClient.IsConnected;


public async Task ConnectAsync(IMqttClientOptions options)
{
//TODO VERY BAD
_options = options as MqttClientManagedOptions;
this._usePersistance = _options.Storage != null;
await _baseMqttClient.ConnectAsync(options);
SetupOutgoingPacketProcessingAsync();

//load persistentMessages
if (_usePersistance)
{
if (_persistentMessagesManager == null)
_persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options);
await _persistentMessagesManager.LoadMessagesAsync();
await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
}
}

public async Task DisconnectAsync()
{
await _baseMqttClient.DisconnectAsync();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
await _baseMqttClient.UnsubscribeAsync(topicFilters);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
await InternalPublishAsync(applicationMessages, true);
}

private async Task InternalPublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages, bool appendIfUsePersistance)
{
ThrowIfNotConnected();

foreach (var applicationMessage in applicationMessages)
{
if (_usePersistance && appendIfUsePersistance)
await _persistentMessagesManager.SaveMessageAsync(applicationMessage);

_inflightQueue.Add(applicationMessage);
}
}

public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await _baseMqttClient.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}

private void SetupOutgoingPacketProcessingAsync()
{
Task.Factory.StartNew(
() => SendPackets(_baseMqttClient._cancellationTokenSource.Token),
_baseMqttClient._cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
}

private async Task SendPackets(CancellationToken cancellationToken)
{
MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
MqttApplicationMessage messageInQueue = null;

try
{
while (!cancellationToken.IsCancellationRequested)
{
messageInQueue = _inflightQueue.Take();
await _baseMqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
if (_usePersistance)
await _persistentMessagesManager.Remove(messageInQueue);
}
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
//message not send, equeue it again
if (messageInQueue != null)
_inflightQueue.Add(messageInQueue);
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
await DisconnectAsync().ConfigureAwait(false);
}
finally
{
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
}
}
}
}

+ 12
- 0
MQTTnet.Core/Client/MqttClientManagedOptions.cs Zobrazit soubor

@@ -0,0 +1,12 @@

using System;

namespace MQTTnet.Core.Client
{
public class MqttClientManagedOptions: MqttClientTcpOptions
{
public bool UseAutoReconnect { get; set; }
public TimeSpan AutoReconnectDelay { get; set; }
public IMqttClientQueuedStorage Storage { get; set; }
}
}

+ 12
- 12
MQTTnet.Core/Client/MqttClientOptions.cs Zobrazit soubor

@@ -1,26 +1,26 @@
using System;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Serializer;
using System;

namespace MQTTnet.Core.Client
{
public abstract class MqttClientOptions
public interface IMqttClientOptions
{
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();
MqttClientTlsOptions TlsOptions { get; set; }

public MqttApplicationMessage WillMessage { get; set; }
MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }
string UserName { get; set; }

public string Password { get; set; }
string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);
string ClientId { get; set; }

public bool CleanSession { get; set; } = true;
bool CleanSession { get; set; }

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);
TimeSpan KeepAlivePeriod { get; set; }

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
TimeSpan DefaultCommunicationTimeout { get; set; }

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
MqttProtocolVersion ProtocolVersion { get; set; }
}
}

+ 84
- 0
MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs Zobrazit soubor

@@ -0,0 +1,84 @@
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Packets;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public class MqttClientQueuedPersistentMessagesManager
{
private readonly IList<MqttApplicationMessage> _persistedMessages = new List<MqttApplicationMessage>();
private readonly MqttClientManagedOptions _options;

public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public async Task LoadMessagesAsync()
{
try
{
var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync();
lock (_persistedMessages)
{
_persistedMessages.Clear();
foreach (var persistentMessage in persistentMessages)
{
_persistedMessages.Add(persistentMessage);
}
}
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages.");
}
}

public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage)
{
if (applicationMessage != null)
{
lock (_persistedMessages)
{
_persistedMessages.Add(applicationMessage);
}
}
try
{
if (_options.Storage != null)
{
await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages);
}
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages.");
}
}

public List<MqttApplicationMessage> GetMessages()
{
var persistedMessages = new List<MqttApplicationMessage>();
lock (_persistedMessages)
{
foreach (var persistedMessage in _persistedMessages)
{
persistedMessages.Add(persistedMessage);
}
}

return persistedMessages;
}

public async Task Remove(MqttApplicationMessage message)
{
lock (_persistedMessages)
_persistedMessages.Remove(message);

await SaveMessageAsync(null);
}
}
}

+ 24
- 2
MQTTnet.Core/Client/MqttClientTcpOptions.cs Zobrazit soubor

@@ -1,7 +1,29 @@
namespace MQTTnet.Core.Client
using MQTTnet.Core.Serializer;
using System;

namespace MQTTnet.Core.Client
{
public class MqttClientTcpOptions : MqttClientOptions
public class MqttClientTcpOptions : IMqttClientOptions
{
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();

public MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }

public string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);

public bool CleanSession { get; set; } = true;

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;


public string Server { get; set; }

public int? Port { get; set; }


+ 24
- 2
MQTTnet.Core/Client/MqttClientWebSocketOptions.cs Zobrazit soubor

@@ -1,7 +1,29 @@
namespace MQTTnet.Core.Client
using System;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Client
{
public class MqttClientWebSocketOptions : MqttClientOptions
public class MqttClientWebSocketOptions : IMqttClientOptions
{
public string Uri { get; set; }

public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();

public MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }

public string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);

public bool CleanSession { get; set; } = true;

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

}
}

+ 0
- 8
MQTTnet.Core/MQTTnet.Core.csproj Zobrazit soubor

@@ -22,14 +22,6 @@
<PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|AnyCPU'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|x64'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x86'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x86'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|x86'" />

</Project>

+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs Zobrazit soubor

@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests
_adapter = adapter;
}

public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
return _adapter;
}


+ 172
- 6
Tests/MQTTnet.TestApp.NetCore/Program.cs Zobrazit soubor

@@ -19,26 +19,127 @@ namespace MQTTnet.TestApp.NetCore
{
Console.WriteLine("MQTTnet - TestApp.NetFramework");
Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server");
Console.WriteLine("3 = Start performance test");
Console.WriteLine("2 = Start queued client");
Console.WriteLine("3 = Start server");
Console.WriteLine("4 = Start performance test");
var pressedKey = Console.ReadKey(true);
if (pressedKey.Key == ConsoleKey.D1)
{
Task.Run(() => RunClientAsync(args));
Thread.Sleep(Timeout.Infinite);
}
else if (pressedKey.Key == ConsoleKey.D2)
if (pressedKey.Key == ConsoleKey.D2)
{
Task.Run(() => RunServerAsync(args));
Task.Run(() => RunClientQueuedAsync(args));
Thread.Sleep(Timeout.Infinite);
}
else if (pressedKey.Key == ConsoleKey.D3)
{
Task.Run(() => RunServerAsync(args));
Thread.Sleep(Timeout.Infinite);
}
else if (pressedKey.Key == ConsoleKey.D4)
{
Task.Run(PerformanceTest.RunAsync);
Thread.Sleep(Timeout.Infinite);
}
}

private static async Task RunClientQueuedAsync(string[] arguments)
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
}
};

try
{
var options = new MqttClientManagedOptions
{
Server = "192.168.0.14",
ClientId = "XYZ",
CleanSession = true,
UserName = "lobu",
Password = "passworda",
KeepAlivePeriod = TimeSpan.FromSeconds(31),
DefaultCommunicationTimeout = TimeSpan.FromSeconds(20),
Storage = new TestStorage(),
};

var client = new MqttClientFactory().CreateMqttManagedClient();
client.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
Console.WriteLine();
};

client.Connected += async (s, e) =>
{
Console.WriteLine("### CONNECTED WITH SERVER ###");

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
});

Console.WriteLine("### SUBSCRIBED ###");
};

client.Disconnected += async (s, e) =>
{
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(5));

try
{
await client.ConnectAsync(options);
}
catch
{
Console.WriteLine("### RECONNECTING FAILED ###");
}
};

try
{
await client.ConnectAsync(options);
}
catch (Exception exception)
{
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception);
}

Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###");

int i = 0;
while (true)
{
Console.ReadLine();
i++;
var applicationMessage = new MqttApplicationMessage(
"PLNMAIN",
Encoding.UTF8.GetBytes(string.Format("Hello World {0}", i)),
MqttQualityOfServiceLevel.ExactlyOnce,
false
);

await client.PublishAsync(applicationMessage);
}
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
}

private static async Task RunClientAsync(string[] arguments)
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
@@ -56,7 +157,7 @@ namespace MQTTnet.TestApp.NetCore
{
Uri = "localhost",
ClientId = "XYZ",
CleanSession = true
CleanSession = true,
};

var client = new MqttClientFactory().CreateMqttClient();
@@ -76,7 +177,7 @@ namespace MQTTnet.TestApp.NetCore

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
new TopicFilter("#", MqttQualityOfServiceLevel.ExactlyOnce)
});

Console.WriteLine("### SUBSCRIBED ###");
@@ -175,5 +276,70 @@ namespace MQTTnet.TestApp.NetCore

Console.ReadLine();
}

[Serializable]
public sealed class TemporaryApplicationMessage
{
public TemporaryApplicationMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain)
{
Topic = topic ?? throw new ArgumentNullException(nameof(topic));
Payload = payload ?? throw new ArgumentNullException(nameof(payload));
QualityOfServiceLevel = qualityOfServiceLevel;
Retain = retain;
}

public string Topic { get; }

public byte[] Payload { get; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; }

public bool Retain { get; }
}

private class TestStorage : IMqttClientQueuedStorage
{
string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin");
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();

public Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync()
{
//deserialize
// MqttApplicationMessage is not serializable
if (System.IO.File.Exists(serializationFile))
{
using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Open))
{
var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();

var temp = (List<TemporaryApplicationMessage>)bformatter.Deserialize(stream);
foreach (var a in temp)
{
_messages.Add(new MqttApplicationMessage(a.Topic, a.Payload, a.QualityOfServiceLevel, a.Retain));
}
}
}
return Task.FromResult(_messages);
}

public Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages)
{
_messages = messages;
//serialize
// MqttApplicationMessage is not serializable
using (System.IO.Stream stream = System.IO.File.Open(serializationFile, System.IO.FileMode.Create))
{
var bformatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
IList<TemporaryApplicationMessage> temp = new List<TemporaryApplicationMessage>();
foreach (var m in _messages)
{
temp.Add(new TemporaryApplicationMessage(m.Topic, m.Payload, m.QualityOfServiceLevel, m.Retain));
}
bformatter.Serialize(stream, temp);
}

return Task.FromResult(0);
}
}
}
}

binární
Zobrazit soubor


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs Zobrazit soubor

@@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Connect(object sender, RoutedEventArgs e)
{
MqttClientOptions options = null;
IMqttClientOptions options = null;
if (UseTcp.IsChecked == true)
{
options = new MqttClientTcpOptions


Načítá se…
Zrušit
Uložit