Browse Source

Refactor manage client

release/3.x.x
Christian Kratky 7 years ago
parent
commit
24c35de330
17 changed files with 477 additions and 100 deletions
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +2
    -2
      MQTTnet.Core/Client/IMqttClient.cs
  3. +8
    -5
      MQTTnet.Core/Client/MqttClient.cs
  4. +12
    -0
      MQTTnet.Core/Client/MqttClientConnectResult.cs
  5. +14
    -0
      MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs
  6. +0
    -1
      MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs
  7. +237
    -80
      MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
  8. +30
    -0
      MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs
  9. +0
    -1
      MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs
  10. +75
    -0
      MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs
  11. +9
    -0
      MQTTnet.Core/ManagedClient/ReconnectionResult.cs
  12. +10
    -0
      MQTTnet.Core/Packets/TopicFilter.cs
  13. +2
    -1
      MQTTnet.Core/Server/IMqttServer.cs
  14. +2
    -2
      MQTTnet.sln
  15. +1
    -1
      Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
  16. +62
    -0
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  17. +12
    -7
      Tests/MQTTnet.TestApp.NetCore/Program.cs

+ 1
- 0
Build/MQTTnet.nuspec View File

@@ -15,6 +15,7 @@
* [Core] Added a builder for application messages using a fluent API * [Core] Added a builder for application messages using a fluent API
* [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta) * [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta)
* [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl) * [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl)
* [Client] The session state response from the server is now returned in the _ConnectAsync_ method and also part of the _Connected_ event args
</releaseNotes> </releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M</tags> <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M</tags>


+ 2
- 2
MQTTnet.Core/Client/IMqttClient.cs View File

@@ -10,10 +10,10 @@ namespace MQTTnet.Core.Client
bool IsConnected { get; } bool IsConnected { get; }


event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler Connected;
event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;


Task ConnectAsync(IMqttClientOptions options);
Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync(); Task DisconnectAsync();


Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters); Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters);


+ 8
- 5
MQTTnet.Core/Client/MqttClient.cs View File

@@ -33,13 +33,13 @@ namespace MQTTnet.Core.Client
_packetDispatcher = new MqttPacketDispatcher(trace); _packetDispatcher = new MqttPacketDispatcher(trace);
} }


public event EventHandler Connected;
public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public bool IsConnected { get; private set; } public bool IsConnected { get; private set; }


public async Task ConnectAsync(IMqttClientOptions options)
public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));


@@ -59,7 +59,7 @@ namespace MQTTnet.Core.Client
_trace.Verbose(nameof(MqttClient), "Connection with server established."); _trace.Verbose(nameof(MqttClient), "Connection with server established.");


await SetupIncomingPacketProcessingAsync(); await SetupIncomingPacketProcessingAsync();
await AuthenticateAsync(options.WillMessage);
var connectResponse = await AuthenticateAsync(options.WillMessage);


_trace.Verbose(nameof(MqttClient), "MQTT connection with server established."); _trace.Verbose(nameof(MqttClient), "MQTT connection with server established.");


@@ -69,7 +69,8 @@ namespace MQTTnet.Core.Client
} }


IsConnected = true; IsConnected = true;
Connected?.Invoke(this, EventArgs.Empty);
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
return new MqttClientConnectResult(connectResponse.IsSessionPresent);
} }
catch (Exception) catch (Exception)
{ {
@@ -178,7 +179,7 @@ namespace MQTTnet.Core.Client
} }
} }


private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
{ {
var connectPacket = new MqttConnectPacket var connectPacket = new MqttConnectPacket
{ {
@@ -195,6 +196,8 @@ namespace MQTTnet.Core.Client
{ {
throw new MqttConnectingFailedException(response.ConnectReturnCode); throw new MqttConnectingFailedException(response.ConnectReturnCode);
} }

return response;
} }


private async Task SetupIncomingPacketProcessingAsync() private async Task SetupIncomingPacketProcessingAsync()


+ 12
- 0
MQTTnet.Core/Client/MqttClientConnectResult.cs View File

@@ -0,0 +1,12 @@
namespace MQTTnet.Core.Client
{
public class MqttClientConnectResult
{
public MqttClientConnectResult(bool isSessionPresent)
{
IsSessionPresent = isSessionPresent;
}

public bool IsSessionPresent { get; }
}
}

+ 14
- 0
MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs View File

@@ -0,0 +1,14 @@
using System;

namespace MQTTnet.Core.Client
{
public class MqttClientConnectedEventArgs : EventArgs
{
public MqttClientConnectedEventArgs(bool isSessionPresent)
{
IsSessionPresent = isSessionPresent;
}

public bool IsSessionPresent { get; }
}
}

+ 0
- 1
MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs View File

@@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient
{ {
IMqttClientOptions ClientOptions { get; } IMqttClientOptions ClientOptions { get; }


bool UseAutoReconnect { get; }
TimeSpan AutoReconnectDelay { get; } TimeSpan AutoReconnectDelay { get; }


IManagedMqttClientStorage Storage { get; } IManagedMqttClientStorage Storage { get; }


+ 237
- 80
MQTTnet.Core/ManagedClient/ManagedMqttClient.cs View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
@@ -7,17 +8,24 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions; using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;


namespace MQTTnet.Core.ManagedClient namespace MQTTnet.Core.ManagedClient
{ {
public class ManagedMqttClient public class ManagedMqttClient
{ {
private readonly List<MqttApplicationMessage> _messageQueue = new List<MqttApplicationMessage>();
private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false);
private readonly MqttClient _mqttClient;
private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
private readonly HashSet<TopicFilter> _subscriptions = new HashSet<TopicFilter>();

private readonly IMqttClient _mqttClient;
private readonly MqttNetTrace _trace; private readonly MqttNetTrace _trace;


private CancellationTokenSource _connectionCancellationToken;
private CancellationTokenSource _publishingCancellationToken;

private IManagedMqttClientOptions _options; private IManagedMqttClientOptions _options;
private bool _subscriptionsNotPushed;
public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace)
{ {
@@ -30,120 +38,269 @@ namespace MQTTnet.Core.ManagedClient
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
} }


public event EventHandler Connected;
public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public bool IsConnected => _mqttClient.IsConnected; public bool IsConnected => _mqttClient.IsConnected;



public void Start(IManagedMqttClientOptions options)
public async Task StartAsync(IManagedMqttClientOptions options)
{ {
if (options == null) throw new ArgumentNullException(nameof(options)); if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options)); if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));


if (!options.ClientOptions.CleanSession)
{
throw new NotSupportedException("The managed client does not support existing sessions.");
}

if (_connectionCancellationToken != null)
{
throw new InvalidOperationException("The managed client is already started.");
}

_options = options;
await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);


if (_options.Storage != null)
{
var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
foreach (var loadedMessage in loadedMessages)
{
_messageQueue.Add(loadedMessage);
}
}

_connectionCancellationToken = new CancellationTokenSource();
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

_trace.Information(nameof(ManagedMqttClient), "Started");
} }


public void Stop()
public Task StopAsync()
{ {
_connectionCancellationToken?.Cancel(false);
_connectionCancellationToken = null;

while (_messageQueue.Any())
{
_messageQueue.Take();
}

return Task.FromResult(0);
} }


public async Task ConnectAsync(IManagedMqttClientOptions options)
public Task EnqueueAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{ {
//////TODO VERY BAD
////_options = options as ManagedMqttClientTcpOptions;
////this._usePersistance = _options.Storage != null;
////await _mqttClient.ConnectAsync(options);
////SetupOutgoingPacketProcessingAsync();

//////load persistentMessages
////////if (_usePersistance)
////////{
//////// if (_persistentMessagesManager == null)
//////// _persistentMessagesManager = new ManagedMqttClientMessagesManager(_options);
//////// await _persistentMessagesManager.LoadMessagesAsync();
//////// await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
////////}
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

foreach (var applicationMessage in applicationMessages)
{
_messageQueue.Add(applicationMessage);
}

return Task.FromResult(0);
} }


public async Task DisconnectAsync()
public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{ {
await _mqttClient.DisconnectAsync();
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

lock (_subscriptions)
{
foreach (var topicFilter in topicFilters)
{
if (_subscriptions.Add(topicFilter))
{
_subscriptionsNotPushed = true;
}
}
}

return Task.FromResult(0);
} }


public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
public Task UnsubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{ {
// TODO: Move all subscriptions to list an subscribe after connection has lost. But only if server session is new.
await _mqttClient.UnsubscribeAsync(topicFilters);
lock (_subscriptions)
{
foreach (var topicFilter in topicFilters)
{
if (_subscriptions.Remove(topicFilter))
{
_subscriptionsNotPushed = true;
}
}
}

return Task.FromResult(0);
} }


public void Enqueue(IEnumerable<MqttApplicationMessage> applicationMessages)
private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{ {
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
try
{
while (!cancellationToken.IsCancellationRequested)
{
var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
if (connectionState == ReconnectionResult.NotConnected)
{
_publishingCancellationToken?.Cancel(false);
_publishingCancellationToken = null;


ThrowIfNotConnected();
await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
continue;
}


_messageQueue.AddRange(applicationMessages);
_options.Storage?.SaveQueuedMessagesAsync(_messageQueue.ToList());
if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
{
await PushSubscriptionsAsync();


_messageQueueGate.Set();
_publishingCancellationToken = new CancellationTokenSource();

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

continue;
}

if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource)
}
}
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
_trace.Warning(nameof(ManagedMqttClient), exception, "Communication exception while maintaining connection.");
}
catch (Exception exception)
{
_trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while maintaining connection.");
}
finally
{
await _mqttClient.DisconnectAsync().ConfigureAwait(false);
_trace.Information(nameof(ManagedMqttClient), "Stopped");
}
} }
public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)

private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var message = _messageQueue.Take(cancellationToken);
if (message == null)
{
continue;
}

if (cancellationToken.IsCancellationRequested)
{
continue;
}

await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
}
finally
{
_trace.Information(nameof(ManagedMqttClient), "Stopped publishing messages");
}
}

private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
{ {
return await _mqttClient.SubscribeAsync(topicFilters);
try
{
await _mqttClient.PublishAsync(message).ConfigureAwait(false);
}
catch (MqttCommunicationException exception)
{
_trace.Warning(nameof(ManagedMqttClient), exception, "Publishing application message failed.");

if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
_messageQueue.Add(message);
}
}
catch (Exception exception)
{
_trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while publishing queued application message.");
}
} }


private void ThrowIfNotConnected()
private async Task SaveAsync(List<MqttApplicationMessage> messages)
{ {
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}
if (messages == null)
{
return;
}

var storage = _options.Storage;
if (storage != null)
{
return;
}

await _options.Storage.SaveQueuedMessagesAsync(messages);
}


private void SetupOutgoingPacketProcessingAsync()
private async Task PushSubscriptionsAsync()
{ {
//Task.Factory.StartNew(
// () => SendPackets(_mqttClient._cancellationTokenSource.Token),
// _mqttClient._cancellationTokenSource.Token,
// TaskCreationOptions.LongRunning,
// TaskScheduler.Default).ConfigureAwait(false);
_trace.Information(nameof(ManagedMqttClient), "Synchronizing subscriptions");

List<TopicFilter> subscriptions;
lock (_subscriptions)
{
subscriptions = _subscriptions.ToList();
_subscriptionsNotPushed = false;
}

if (!_subscriptions.Any())
{
return;
}

try
{
await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
}
catch (Exception exception)
{
_trace.Warning(nameof(ManagedMqttClient), exception, "Synchronizing subscriptions failed");
_subscriptionsNotPushed = true;
}
} }


private async Task SendPackets(CancellationToken cancellationToken)
private async Task<ReconnectionResult> ReconnectIfRequiredAsync()
{ {
//MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
//MqttApplicationMessage messageInQueue = null;

//try
//{
// while (!cancellationToken.IsCancellationRequested)
// {
// messageInQueue = _inflightQueue.Take();
// await _mqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
// if (_usePersistance)
// await _persistentMessagesManager.Remove(messageInQueue);
// }
//}
//catch (OperationCanceledException)
//{
//}
//catch (MqttCommunicationException exception)
//{
// MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
// //message not send, equeue it again
// if (messageInQueue != null)
// _inflightQueue.Add(messageInQueue);
//}
//catch (Exception exception)
//{
// MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
// await DisconnectAsync().ConfigureAwait(false);
//}
//finally
//{
// MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
//}
if (_mqttClient.IsConnected)
{
return ReconnectionResult.StillConnected;
}

try
{
await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
return ReconnectionResult.Reconnected;
}
catch (Exception)
{
return ReconnectionResult.NotConnected;
}
} }


private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
@@ -156,7 +313,7 @@ namespace MQTTnet.Core.ManagedClient
Disconnected?.Invoke(this, eventArgs); Disconnected?.Invoke(this, eventArgs);
} }


private void OnConnected(object sender, EventArgs eventArgs)
private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{ {
Connected?.Invoke(this, eventArgs); Connected?.Invoke(this, eventArgs);
} }


+ 30
- 0
MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs View File

@@ -0,0 +1,30 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.ManagedClient
{
public static class ManagedMqttClientExtensions
{
public static Task EnqueueAsync(this ManagedMqttClient managedClient, params MqttApplicationMessage[] applicationMessages)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));

return managedClient.EnqueueAsync(applicationMessages);
}

public static Task SubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));

return managedClient.SubscribeAsync(topicFilters);
}

public static Task UnsubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));

return managedClient.UnsubscribeAsync(topicFilters);
}
}
}

+ 0
- 1
MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs View File

@@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient
{ {
public IMqttClientOptions ClientOptions { get; set; } public IMqttClientOptions ClientOptions { get; set; }


public bool UseAutoReconnect { get; set; } = true;
public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);


public IManagedMqttClientStorage Storage { get; set; } public IManagedMqttClientStorage Storage { get; set; }


+ 75
- 0
MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs View File

@@ -0,0 +1,75 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Core.ManagedClient
{
public class ManagedMqttClientStorageManager
{
private readonly List<MqttApplicationMessage> _applicationMessages = new List<MqttApplicationMessage>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private IManagedMqttClientStorage _storage;

public async Task SetStorageAsync(IManagedMqttClientStorage storage)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
_storage = storage;
}
finally
{
_semaphore.Release();
}
}

public async Task AddAsync(MqttApplicationMessage applicationMessage)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
if (_storage == null)
{
return;
}

_applicationMessages.Add(applicationMessage);
await SaveAsync().ConfigureAwait(false);
}
finally
{
_semaphore.Release();
}
}

public async Task RemoveAsync(MqttApplicationMessage applicationMessage)
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
if (_storage == null)
{
return;
}

var index = _applicationMessages.IndexOf(applicationMessage);
if (index == -1)
{
return;
}

_applicationMessages.RemoveAt(index);
await SaveAsync().ConfigureAwait(false);
}
finally
{
_semaphore.Release();
}
}

private Task SaveAsync()
{
return _storage.SaveQueuedMessagesAsync(_applicationMessages);
}
}
}

+ 9
- 0
MQTTnet.Core/ManagedClient/ReconnectionResult.cs View File

@@ -0,0 +1,9 @@
namespace MQTTnet.Core.ManagedClient
{
public enum ReconnectionResult
{
StillConnected,
Reconnected,
NotConnected
}
}

+ 10
- 0
MQTTnet.Core/Packets/TopicFilter.cs View File

@@ -13,5 +13,15 @@ namespace MQTTnet.Core.Packets
public string Topic { get; } public string Topic { get; }


public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; }

public override int GetHashCode()
{
return QualityOfServiceLevel.GetHashCode() ^ (Topic ?? string.Empty).GetHashCode();
}

public override string ToString()
{
return Topic + "@" + QualityOfServiceLevel;
}
} }
} }

+ 2
- 1
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -6,10 +6,11 @@ namespace MQTTnet.Core.Server
{ {
public interface IMqttServer public interface IMqttServer
{ {
event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler<MqttClientConnectedEventArgs> ClientConnected; event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;


event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

IList<ConnectedMqttClient> GetConnectedClients(); IList<ConnectedMqttClient> GetConnectedClients();
void Publish(MqttApplicationMessage applicationMessage); void Publish(MqttApplicationMessage applicationMessage);




+ 2
- 2
MQTTnet.sln View File

@@ -1,7 +1,7 @@
 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio 15
VisualStudioVersion = 15.0.27004.2002
VisualStudioVersion = 15.0.27004.2005
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject EndProject
@@ -31,7 +31,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution


+ 1
- 1
Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs View File

@@ -32,7 +32,7 @@ namespace MQTTnet.Core.Tests
{ {
var tcs = new TaskCompletionSource<object>(); var tcs = new TaskCompletionSource<object>();


void Handler(object sender, MqttClientConnectedEventArgs args)
void Handler(object sender, Server.MqttClientConnectedEventArgs args)
{ {
if (args.Client.ClientId == clientId) if (args.Client.ClientId == clientId)
{ {


+ 62
- 0
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -0,0 +1,62 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Core;
using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.ManagedClient;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;

namespace MQTTnet.TestApp.NetCore
{
public static class ManagedClientTest
{
public static async Task RunAsync()
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}");
if (e.TraceMessage.Exception != null)
{
Console.WriteLine(e.TraceMessage.Exception);
}
};

var options = new ManagedMqttClientOptions
{
ClientOptions = new MqttClientTcpOptions
{
Server = "broker.hivemq.com",
ClientId = "MQTTnetManagedClientTest"
},

AutoReconnectDelay = TimeSpan.FromSeconds(1)
};

try
{
var managedClient = new MqttClientFactory().CreateManagedMqttClient();
managedClient.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
};

await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build());
await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build());

await managedClient.StartAsync(options);

await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce));

await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build());

Console.WriteLine("Managed client started.");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
}

+ 12
- 7
Tests/MQTTnet.TestApp.NetCore/Program.cs View File

@@ -23,22 +23,27 @@ namespace MQTTnet.TestApp.NetCore
Console.WriteLine("1 = Start client"); Console.WriteLine("1 = Start client");
Console.WriteLine("2 = Start server"); Console.WriteLine("2 = Start server");
Console.WriteLine("3 = Start performance test"); Console.WriteLine("3 = Start performance test");
Console.WriteLine("4 = Start managed client");

var pressedKey = Console.ReadKey(true); var pressedKey = Console.ReadKey(true);
if (pressedKey.Key == ConsoleKey.D1)
if (pressedKey.KeyChar == '1')
{ {
Task.Run(RunClientAsync); Task.Run(RunClientAsync);
Thread.Sleep(Timeout.Infinite);
} }
else if (pressedKey.Key == ConsoleKey.D2)
else if (pressedKey.KeyChar == '2')
{ {
Task.Run(() => RunServerAsync());
Thread.Sleep(Timeout.Infinite);
Task.Run(RunServerAsync);
} }
else if (pressedKey.Key == ConsoleKey.D3)
else if (pressedKey.KeyChar == '3')
{ {
Task.Run(PerformanceTest.RunAsync); Task.Run(PerformanceTest.RunAsync);
Thread.Sleep(Timeout.Infinite);
} }
else if (pressedKey.KeyChar == '4')
{
Task.Run(ManagedClientTest.RunAsync);
}

Thread.Sleep(Timeout.Infinite);
} }


private static async Task RunClientAsync() private static async Task RunClientAsync()


Loading…
Cancel
Save