diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index d82e369..b78712d 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -15,6 +15,7 @@
* [Core] Added a builder for application messages using a fluent API
* [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta)
* [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl)
+* [Client] The session state response from the server is now returned in the _ConnectAsync_ method and also part of the _Connected_ event args
Copyright Christian Kratky 2016-2017
MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M
diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs
index 7219c58..04b5dcd 100644
--- a/MQTTnet.Core/Client/IMqttClient.cs
+++ b/MQTTnet.Core/Client/IMqttClient.cs
@@ -10,10 +10,10 @@ namespace MQTTnet.Core.Client
bool IsConnected { get; }
event EventHandler ApplicationMessageReceived;
- event EventHandler Connected;
+ event EventHandler Connected;
event EventHandler Disconnected;
- Task ConnectAsync(IMqttClientOptions options);
+ Task ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync();
Task> SubscribeAsync(IEnumerable topicFilters);
diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs
index 4e595d7..fe1e4ab 100644
--- a/MQTTnet.Core/Client/MqttClient.cs
+++ b/MQTTnet.Core/Client/MqttClient.cs
@@ -33,13 +33,13 @@ namespace MQTTnet.Core.Client
_packetDispatcher = new MqttPacketDispatcher(trace);
}
- public event EventHandler Connected;
+ public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler ApplicationMessageReceived;
public bool IsConnected { get; private set; }
- public async Task ConnectAsync(IMqttClientOptions options)
+ public async Task ConnectAsync(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
@@ -59,7 +59,7 @@ namespace MQTTnet.Core.Client
_trace.Verbose(nameof(MqttClient), "Connection with server established.");
await SetupIncomingPacketProcessingAsync();
- await AuthenticateAsync(options.WillMessage);
+ var connectResponse = await AuthenticateAsync(options.WillMessage);
_trace.Verbose(nameof(MqttClient), "MQTT connection with server established.");
@@ -69,7 +69,8 @@ namespace MQTTnet.Core.Client
}
IsConnected = true;
- Connected?.Invoke(this, EventArgs.Empty);
+ Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent));
+ return new MqttClientConnectResult(connectResponse.IsSessionPresent);
}
catch (Exception)
{
@@ -178,7 +179,7 @@ namespace MQTTnet.Core.Client
}
}
- private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
+ private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage)
{
var connectPacket = new MqttConnectPacket
{
@@ -195,6 +196,8 @@ namespace MQTTnet.Core.Client
{
throw new MqttConnectingFailedException(response.ConnectReturnCode);
}
+
+ return response;
}
private async Task SetupIncomingPacketProcessingAsync()
diff --git a/MQTTnet.Core/Client/MqttClientConnectResult.cs b/MQTTnet.Core/Client/MqttClientConnectResult.cs
new file mode 100644
index 0000000..57db57d
--- /dev/null
+++ b/MQTTnet.Core/Client/MqttClientConnectResult.cs
@@ -0,0 +1,12 @@
+namespace MQTTnet.Core.Client
+{
+ public class MqttClientConnectResult
+ {
+ public MqttClientConnectResult(bool isSessionPresent)
+ {
+ IsSessionPresent = isSessionPresent;
+ }
+
+ public bool IsSessionPresent { get; }
+ }
+}
diff --git a/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs b/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs
new file mode 100644
index 0000000..107a2c9
--- /dev/null
+++ b/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs
@@ -0,0 +1,14 @@
+using System;
+
+namespace MQTTnet.Core.Client
+{
+ public class MqttClientConnectedEventArgs : EventArgs
+ {
+ public MqttClientConnectedEventArgs(bool isSessionPresent)
+ {
+ IsSessionPresent = isSessionPresent;
+ }
+
+ public bool IsSessionPresent { get; }
+ }
+}
diff --git a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs
index 33c91e3..298de9a 100644
--- a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs
+++ b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs
@@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient
{
IMqttClientOptions ClientOptions { get; }
- bool UseAutoReconnect { get; }
TimeSpan AutoReconnectDelay { get; }
IManagedMqttClientStorage Storage { get; }
diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
index 989d117..1a23912 100644
--- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
+++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
@@ -1,4 +1,5 @@
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -7,17 +8,24 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
+using MQTTnet.Core.Protocol;
namespace MQTTnet.Core.ManagedClient
{
public class ManagedMqttClient
{
- private readonly List _messageQueue = new List();
- private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false);
- private readonly MqttClient _mqttClient;
+ private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager();
+ private readonly BlockingCollection _messageQueue = new BlockingCollection();
+ private readonly HashSet _subscriptions = new HashSet();
+
+ private readonly IMqttClient _mqttClient;
private readonly MqttNetTrace _trace;
+ private CancellationTokenSource _connectionCancellationToken;
+ private CancellationTokenSource _publishingCancellationToken;
+
private IManagedMqttClientOptions _options;
+ private bool _subscriptionsNotPushed;
public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace)
{
@@ -30,120 +38,269 @@ namespace MQTTnet.Core.ManagedClient
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
}
- public event EventHandler Connected;
+ public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler ApplicationMessageReceived;
public bool IsConnected => _mqttClient.IsConnected;
-
- public void Start(IManagedMqttClientOptions options)
+ public async Task StartAsync(IManagedMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options));
+ if (!options.ClientOptions.CleanSession)
+ {
+ throw new NotSupportedException("The managed client does not support existing sessions.");
+ }
+
+ if (_connectionCancellationToken != null)
+ {
+ throw new InvalidOperationException("The managed client is already started.");
+ }
+
+ _options = options;
+ await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false);
+ if (_options.Storage != null)
+ {
+ var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
+ foreach (var loadedMessage in loadedMessages)
+ {
+ _messageQueue.Add(loadedMessage);
+ }
+ }
+
+ _connectionCancellationToken = new CancellationTokenSource();
+
+#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
+ Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
+#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
+
+ _trace.Information(nameof(ManagedMqttClient), "Started");
}
- public void Stop()
+ public Task StopAsync()
{
-
+ _connectionCancellationToken?.Cancel(false);
+ _connectionCancellationToken = null;
+
+ while (_messageQueue.Any())
+ {
+ _messageQueue.Take();
+ }
+
+ return Task.FromResult(0);
}
- public async Task ConnectAsync(IManagedMqttClientOptions options)
+ public Task EnqueueAsync(IEnumerable applicationMessages)
{
- //////TODO VERY BAD
- ////_options = options as ManagedMqttClientTcpOptions;
- ////this._usePersistance = _options.Storage != null;
- ////await _mqttClient.ConnectAsync(options);
- ////SetupOutgoingPacketProcessingAsync();
-
- //////load persistentMessages
- ////////if (_usePersistance)
- ////////{
- //////// if (_persistentMessagesManager == null)
- //////// _persistentMessagesManager = new ManagedMqttClientMessagesManager(_options);
- //////// await _persistentMessagesManager.LoadMessagesAsync();
- //////// await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
- ////////}
+ if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
+
+ foreach (var applicationMessage in applicationMessages)
+ {
+ _messageQueue.Add(applicationMessage);
+ }
+
+ return Task.FromResult(0);
}
- public async Task DisconnectAsync()
+ public Task SubscribeAsync(IEnumerable topicFilters)
{
- await _mqttClient.DisconnectAsync();
+ if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
+
+ lock (_subscriptions)
+ {
+ foreach (var topicFilter in topicFilters)
+ {
+ if (_subscriptions.Add(topicFilter))
+ {
+ _subscriptionsNotPushed = true;
+ }
+ }
+ }
+
+ return Task.FromResult(0);
}
- public async Task UnsubscribeAsync(IEnumerable topicFilters)
+ public Task UnsubscribeAsync(IEnumerable topicFilters)
{
- // TODO: Move all subscriptions to list an subscribe after connection has lost. But only if server session is new.
- await _mqttClient.UnsubscribeAsync(topicFilters);
+ lock (_subscriptions)
+ {
+ foreach (var topicFilter in topicFilters)
+ {
+ if (_subscriptions.Remove(topicFilter))
+ {
+ _subscriptionsNotPushed = true;
+ }
+ }
+ }
+
+ return Task.FromResult(0);
}
- public void Enqueue(IEnumerable applicationMessages)
+ private async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{
- if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false);
+ if (connectionState == ReconnectionResult.NotConnected)
+ {
+ _publishingCancellationToken?.Cancel(false);
+ _publishingCancellationToken = null;
- ThrowIfNotConnected();
+ await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
- _messageQueue.AddRange(applicationMessages);
- _options.Storage?.SaveQueuedMessagesAsync(_messageQueue.ToList());
+ if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed)
+ {
+ await PushSubscriptionsAsync();
- _messageQueueGate.Set();
+ _publishingCancellationToken = new CancellationTokenSource();
+
+#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
+ Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false);
+#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
+
+ continue;
+ }
+
+ if (connectionState == ReconnectionResult.StillConnected)
+ {
+ await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource)
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (MqttCommunicationException exception)
+ {
+ _trace.Warning(nameof(ManagedMqttClient), exception, "Communication exception while maintaining connection.");
+ }
+ catch (Exception exception)
+ {
+ _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while maintaining connection.");
+ }
+ finally
+ {
+ await _mqttClient.DisconnectAsync().ConfigureAwait(false);
+ _trace.Information(nameof(ManagedMqttClient), "Stopped");
+ }
}
-
- public async Task> SubscribeAsync(IEnumerable topicFilters)
+
+ private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var message = _messageQueue.Take(cancellationToken);
+ if (message == null)
+ {
+ continue;
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ continue;
+ }
+
+ await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ finally
+ {
+ _trace.Information(nameof(ManagedMqttClient), "Stopped publishing messages");
+ }
+ }
+
+ private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
{
- return await _mqttClient.SubscribeAsync(topicFilters);
+ try
+ {
+ await _mqttClient.PublishAsync(message).ConfigureAwait(false);
+ }
+ catch (MqttCommunicationException exception)
+ {
+ _trace.Warning(nameof(ManagedMqttClient), exception, "Publishing application message failed.");
+
+ if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
+ {
+ _messageQueue.Add(message);
+ }
+ }
+ catch (Exception exception)
+ {
+ _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while publishing queued application message.");
+ }
}
- private void ThrowIfNotConnected()
+ private async Task SaveAsync(List messages)
{
- if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
- }
+ if (messages == null)
+ {
+ return;
+ }
+
+ var storage = _options.Storage;
+ if (storage != null)
+ {
+ return;
+ }
+
+ await _options.Storage.SaveQueuedMessagesAsync(messages);
+ }
- private void SetupOutgoingPacketProcessingAsync()
+ private async Task PushSubscriptionsAsync()
{
- //Task.Factory.StartNew(
- // () => SendPackets(_mqttClient._cancellationTokenSource.Token),
- // _mqttClient._cancellationTokenSource.Token,
- // TaskCreationOptions.LongRunning,
- // TaskScheduler.Default).ConfigureAwait(false);
+ _trace.Information(nameof(ManagedMqttClient), "Synchronizing subscriptions");
+
+ List subscriptions;
+ lock (_subscriptions)
+ {
+ subscriptions = _subscriptions.ToList();
+ _subscriptionsNotPushed = false;
+ }
+
+ if (!_subscriptions.Any())
+ {
+ return;
+ }
+
+ try
+ {
+ await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ _trace.Warning(nameof(ManagedMqttClient), exception, "Synchronizing subscriptions failed");
+ _subscriptionsNotPushed = true;
+ }
}
- private async Task SendPackets(CancellationToken cancellationToken)
+ private async Task ReconnectIfRequiredAsync()
{
- //MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
- //MqttApplicationMessage messageInQueue = null;
-
- //try
- //{
- // while (!cancellationToken.IsCancellationRequested)
- // {
- // messageInQueue = _inflightQueue.Take();
- // await _mqttClient.PublishAsync(new List() { messageInQueue });
- // if (_usePersistance)
- // await _persistentMessagesManager.Remove(messageInQueue);
- // }
- //}
- //catch (OperationCanceledException)
- //{
- //}
- //catch (MqttCommunicationException exception)
- //{
- // MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
- // //message not send, equeue it again
- // if (messageInQueue != null)
- // _inflightQueue.Add(messageInQueue);
- //}
- //catch (Exception exception)
- //{
- // MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
- // await DisconnectAsync().ConfigureAwait(false);
- //}
- //finally
- //{
- // MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
- //}
+ if (_mqttClient.IsConnected)
+ {
+ return ReconnectionResult.StillConnected;
+ }
+
+ try
+ {
+ await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
+ return ReconnectionResult.Reconnected;
+ }
+ catch (Exception)
+ {
+ return ReconnectionResult.NotConnected;
+ }
}
private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
@@ -156,7 +313,7 @@ namespace MQTTnet.Core.ManagedClient
Disconnected?.Invoke(this, eventArgs);
}
- private void OnConnected(object sender, EventArgs eventArgs)
+ private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{
Connected?.Invoke(this, eventArgs);
}
diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs
new file mode 100644
index 0000000..62aa166
--- /dev/null
+++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Threading.Tasks;
+using MQTTnet.Core.Packets;
+
+namespace MQTTnet.Core.ManagedClient
+{
+ public static class ManagedMqttClientExtensions
+ {
+ public static Task EnqueueAsync(this ManagedMqttClient managedClient, params MqttApplicationMessage[] applicationMessages)
+ {
+ if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));
+
+ return managedClient.EnqueueAsync(applicationMessages);
+ }
+
+ public static Task SubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
+ {
+ if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));
+
+ return managedClient.SubscribeAsync(topicFilters);
+ }
+
+ public static Task UnsubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters)
+ {
+ if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));
+
+ return managedClient.UnsubscribeAsync(topicFilters);
+ }
+ }
+}
diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs
index bcddb63..2e24fd0 100644
--- a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs
+++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs
@@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient
{
public IMqttClientOptions ClientOptions { get; set; }
- public bool UseAutoReconnect { get; set; } = true;
public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);
public IManagedMqttClientStorage Storage { get; set; }
diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs
new file mode 100644
index 0000000..0810f36
--- /dev/null
+++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs
@@ -0,0 +1,75 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Core.ManagedClient
+{
+ public class ManagedMqttClientStorageManager
+ {
+ private readonly List _applicationMessages = new List();
+ private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
+ private IManagedMqttClientStorage _storage;
+
+ public async Task SetStorageAsync(IManagedMqttClientStorage storage)
+ {
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ _storage = storage;
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ public async Task AddAsync(MqttApplicationMessage applicationMessage)
+ {
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ if (_storage == null)
+ {
+ return;
+ }
+
+ _applicationMessages.Add(applicationMessage);
+ await SaveAsync().ConfigureAwait(false);
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ public async Task RemoveAsync(MqttApplicationMessage applicationMessage)
+ {
+ await _semaphore.WaitAsync().ConfigureAwait(false);
+ try
+ {
+ if (_storage == null)
+ {
+ return;
+ }
+
+ var index = _applicationMessages.IndexOf(applicationMessage);
+ if (index == -1)
+ {
+ return;
+ }
+
+ _applicationMessages.RemoveAt(index);
+ await SaveAsync().ConfigureAwait(false);
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ private Task SaveAsync()
+ {
+ return _storage.SaveQueuedMessagesAsync(_applicationMessages);
+ }
+ }
+}
diff --git a/MQTTnet.Core/ManagedClient/ReconnectionResult.cs b/MQTTnet.Core/ManagedClient/ReconnectionResult.cs
new file mode 100644
index 0000000..fae0770
--- /dev/null
+++ b/MQTTnet.Core/ManagedClient/ReconnectionResult.cs
@@ -0,0 +1,9 @@
+namespace MQTTnet.Core.ManagedClient
+{
+ public enum ReconnectionResult
+ {
+ StillConnected,
+ Reconnected,
+ NotConnected
+ }
+}
diff --git a/MQTTnet.Core/Packets/TopicFilter.cs b/MQTTnet.Core/Packets/TopicFilter.cs
index e2ae422..6786b1d 100644
--- a/MQTTnet.Core/Packets/TopicFilter.cs
+++ b/MQTTnet.Core/Packets/TopicFilter.cs
@@ -13,5 +13,15 @@ namespace MQTTnet.Core.Packets
public string Topic { get; }
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; }
+
+ public override int GetHashCode()
+ {
+ return QualityOfServiceLevel.GetHashCode() ^ (Topic ?? string.Empty).GetHashCode();
+ }
+
+ public override string ToString()
+ {
+ return Topic + "@" + QualityOfServiceLevel;
+ }
}
}
\ No newline at end of file
diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs
index a35a266..db3e1b0 100644
--- a/MQTTnet.Core/Server/IMqttServer.cs
+++ b/MQTTnet.Core/Server/IMqttServer.cs
@@ -6,10 +6,11 @@ namespace MQTTnet.Core.Server
{
public interface IMqttServer
{
- event EventHandler ApplicationMessageReceived;
event EventHandler ClientConnected;
event EventHandler ClientDisconnected;
+ event EventHandler ApplicationMessageReceived;
+
IList GetConnectedClients();
void Publish(MqttApplicationMessage applicationMessage);
diff --git a/MQTTnet.sln b/MQTTnet.sln
index db44de8..56a56aa5 100644
--- a/MQTTnet.sln
+++ b/MQTTnet.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
-VisualStudioVersion = 15.0.27004.2002
+VisualStudioVersion = 15.0.27004.2005
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
@@ -31,7 +31,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
index 32fc2cc..ae15cc2 100644
--- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
+++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
@@ -32,7 +32,7 @@ namespace MQTTnet.Core.Tests
{
var tcs = new TaskCompletionSource