From 24c35de33035f82a0d78df0bf58d30a696210015 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sat, 21 Oct 2017 15:49:50 +0200 Subject: [PATCH] Refactor manage client --- Build/MQTTnet.nuspec | 1 + MQTTnet.Core/Client/IMqttClient.cs | 4 +- MQTTnet.Core/Client/MqttClient.cs | 13 +- .../Client/MqttClientConnectResult.cs | 12 + .../Client/MqttClientConnectedEventArgs.cs | 14 + .../IManagedMqttClientOptions.cs | 1 - .../ManagedClient/ManagedMqttClient.cs | 317 +++++++++++++----- .../ManagedMqttClientExtensions.cs | 30 ++ .../ManagedClient/ManagedMqttClientOptions.cs | 1 - .../ManagedMqttClientStorageManager.cs | 75 +++++ .../ManagedClient/ReconnectionResult.cs | 9 + MQTTnet.Core/Packets/TopicFilter.cs | 10 + MQTTnet.Core/Server/IMqttServer.cs | 3 +- MQTTnet.sln | 4 +- .../TestMqttServerAdapter.cs | 2 +- .../ManagedClientTest.cs | 62 ++++ Tests/MQTTnet.TestApp.NetCore/Program.cs | 19 +- 17 files changed, 477 insertions(+), 100 deletions(-) create mode 100644 MQTTnet.Core/Client/MqttClientConnectResult.cs create mode 100644 MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs create mode 100644 MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs create mode 100644 MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs create mode 100644 MQTTnet.Core/ManagedClient/ReconnectionResult.cs create mode 100644 Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index d82e369..b78712d 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -15,6 +15,7 @@ * [Core] Added a builder for application messages using a fluent API * [Client] Added a first version of a managed client which will manage the connection, subscription etc. automatically (Thanks to @JTrotta) * [Server] Added support for WebSockets via ASP.NET Core 2.0 (Thanks to @ChristianRiedl) +* [Client] The session state response from the server is now returned in the _ConnectAsync_ method and also part of the _Connected_ event args Copyright Christian Kratky 2016-2017 MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs index 7219c58..04b5dcd 100644 --- a/MQTTnet.Core/Client/IMqttClient.cs +++ b/MQTTnet.Core/Client/IMqttClient.cs @@ -10,10 +10,10 @@ namespace MQTTnet.Core.Client bool IsConnected { get; } event EventHandler ApplicationMessageReceived; - event EventHandler Connected; + event EventHandler Connected; event EventHandler Disconnected; - Task ConnectAsync(IMqttClientOptions options); + Task ConnectAsync(IMqttClientOptions options); Task DisconnectAsync(); Task> SubscribeAsync(IEnumerable topicFilters); diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 4e595d7..fe1e4ab 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -33,13 +33,13 @@ namespace MQTTnet.Core.Client _packetDispatcher = new MqttPacketDispatcher(trace); } - public event EventHandler Connected; + public event EventHandler Connected; public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; public bool IsConnected { get; private set; } - public async Task ConnectAsync(IMqttClientOptions options) + public async Task ConnectAsync(IMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); @@ -59,7 +59,7 @@ namespace MQTTnet.Core.Client _trace.Verbose(nameof(MqttClient), "Connection with server established."); await SetupIncomingPacketProcessingAsync(); - await AuthenticateAsync(options.WillMessage); + var connectResponse = await AuthenticateAsync(options.WillMessage); _trace.Verbose(nameof(MqttClient), "MQTT connection with server established."); @@ -69,7 +69,8 @@ namespace MQTTnet.Core.Client } IsConnected = true; - Connected?.Invoke(this, EventArgs.Empty); + Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); + return new MqttClientConnectResult(connectResponse.IsSessionPresent); } catch (Exception) { @@ -178,7 +179,7 @@ namespace MQTTnet.Core.Client } } - private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) + private async Task AuthenticateAsync(MqttApplicationMessage willApplicationMessage) { var connectPacket = new MqttConnectPacket { @@ -195,6 +196,8 @@ namespace MQTTnet.Core.Client { throw new MqttConnectingFailedException(response.ConnectReturnCode); } + + return response; } private async Task SetupIncomingPacketProcessingAsync() diff --git a/MQTTnet.Core/Client/MqttClientConnectResult.cs b/MQTTnet.Core/Client/MqttClientConnectResult.cs new file mode 100644 index 0000000..57db57d --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientConnectResult.cs @@ -0,0 +1,12 @@ +namespace MQTTnet.Core.Client +{ + public class MqttClientConnectResult + { + public MqttClientConnectResult(bool isSessionPresent) + { + IsSessionPresent = isSessionPresent; + } + + public bool IsSessionPresent { get; } + } +} diff --git a/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs b/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs new file mode 100644 index 0000000..107a2c9 --- /dev/null +++ b/MQTTnet.Core/Client/MqttClientConnectedEventArgs.cs @@ -0,0 +1,14 @@ +using System; + +namespace MQTTnet.Core.Client +{ + public class MqttClientConnectedEventArgs : EventArgs + { + public MqttClientConnectedEventArgs(bool isSessionPresent) + { + IsSessionPresent = isSessionPresent; + } + + public bool IsSessionPresent { get; } + } +} diff --git a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs index 33c91e3..298de9a 100644 --- a/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs +++ b/MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs @@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient { IMqttClientOptions ClientOptions { get; } - bool UseAutoReconnect { get; } TimeSpan AutoReconnectDelay { get; } IManagedMqttClientStorage Storage { get; } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs index 989d117..1a23912 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -7,17 +8,24 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; namespace MQTTnet.Core.ManagedClient { public class ManagedMqttClient { - private readonly List _messageQueue = new List(); - private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false); - private readonly MqttClient _mqttClient; + private readonly ManagedMqttClientStorageManager _storageManager = new ManagedMqttClientStorageManager(); + private readonly BlockingCollection _messageQueue = new BlockingCollection(); + private readonly HashSet _subscriptions = new HashSet(); + + private readonly IMqttClient _mqttClient; private readonly MqttNetTrace _trace; + private CancellationTokenSource _connectionCancellationToken; + private CancellationTokenSource _publishingCancellationToken; + private IManagedMqttClientOptions _options; + private bool _subscriptionsNotPushed; public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory, MqttNetTrace trace) { @@ -30,120 +38,269 @@ namespace MQTTnet.Core.ManagedClient _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; } - public event EventHandler Connected; + public event EventHandler Connected; public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; public bool IsConnected => _mqttClient.IsConnected; - - public void Start(IManagedMqttClientOptions options) + public async Task StartAsync(IManagedMqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); if (options.ClientOptions == null) throw new ArgumentException("The client options are not set.", nameof(options)); + if (!options.ClientOptions.CleanSession) + { + throw new NotSupportedException("The managed client does not support existing sessions."); + } + + if (_connectionCancellationToken != null) + { + throw new InvalidOperationException("The managed client is already started."); + } + + _options = options; + await _storageManager.SetStorageAsync(_options.Storage).ConfigureAwait(false); + if (_options.Storage != null) + { + var loadedMessages = await _options.Storage.LoadQueuedMessagesAsync().ConfigureAwait(false); + foreach (var loadedMessage in loadedMessages) + { + _messageQueue.Add(loadedMessage); + } + } + + _connectionCancellationToken = new CancellationTokenSource(); + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + _trace.Information(nameof(ManagedMqttClient), "Started"); } - public void Stop() + public Task StopAsync() { - + _connectionCancellationToken?.Cancel(false); + _connectionCancellationToken = null; + + while (_messageQueue.Any()) + { + _messageQueue.Take(); + } + + return Task.FromResult(0); } - public async Task ConnectAsync(IManagedMqttClientOptions options) + public Task EnqueueAsync(IEnumerable applicationMessages) { - //////TODO VERY BAD - ////_options = options as ManagedMqttClientTcpOptions; - ////this._usePersistance = _options.Storage != null; - ////await _mqttClient.ConnectAsync(options); - ////SetupOutgoingPacketProcessingAsync(); - - //////load persistentMessages - ////////if (_usePersistance) - ////////{ - //////// if (_persistentMessagesManager == null) - //////// _persistentMessagesManager = new ManagedMqttClientMessagesManager(_options); - //////// await _persistentMessagesManager.LoadMessagesAsync(); - //////// await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false); - ////////} + if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); + + foreach (var applicationMessage in applicationMessages) + { + _messageQueue.Add(applicationMessage); + } + + return Task.FromResult(0); } - public async Task DisconnectAsync() + public Task SubscribeAsync(IEnumerable topicFilters) { - await _mqttClient.DisconnectAsync(); + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + lock (_subscriptions) + { + foreach (var topicFilter in topicFilters) + { + if (_subscriptions.Add(topicFilter)) + { + _subscriptionsNotPushed = true; + } + } + } + + return Task.FromResult(0); } - public async Task UnsubscribeAsync(IEnumerable topicFilters) + public Task UnsubscribeAsync(IEnumerable topicFilters) { - // TODO: Move all subscriptions to list an subscribe after connection has lost. But only if server session is new. - await _mqttClient.UnsubscribeAsync(topicFilters); + lock (_subscriptions) + { + foreach (var topicFilter in topicFilters) + { + if (_subscriptions.Remove(topicFilter)) + { + _subscriptionsNotPushed = true; + } + } + } + + return Task.FromResult(0); } - public void Enqueue(IEnumerable applicationMessages) + private async Task MaintainConnectionAsync(CancellationToken cancellationToken) { - if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); + try + { + while (!cancellationToken.IsCancellationRequested) + { + var connectionState = await ReconnectIfRequiredAsync().ConfigureAwait(false); + if (connectionState == ReconnectionResult.NotConnected) + { + _publishingCancellationToken?.Cancel(false); + _publishingCancellationToken = null; - ThrowIfNotConnected(); + await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false); + continue; + } - _messageQueue.AddRange(applicationMessages); - _options.Storage?.SaveQueuedMessagesAsync(_messageQueue.ToList()); + if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) + { + await PushSubscriptionsAsync(); - _messageQueueGate.Set(); + _publishingCancellationToken = new CancellationTokenSource(); + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Factory.StartNew(() => PublishQueuedMessagesAsync(_publishingCancellationToken.Token), _publishingCancellationToken.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + continue; + } + + if (connectionState == ReconnectionResult.StillConnected) + { + await Task.Delay(100, _connectionCancellationToken.Token).ConfigureAwait(false); // Consider using the _Disconnected_ event here. (TaskCompletionSource) + } + } + } + catch (OperationCanceledException) + { + } + catch (MqttCommunicationException exception) + { + _trace.Warning(nameof(ManagedMqttClient), exception, "Communication exception while maintaining connection."); + } + catch (Exception exception) + { + _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while maintaining connection."); + } + finally + { + await _mqttClient.DisconnectAsync().ConfigureAwait(false); + _trace.Information(nameof(ManagedMqttClient), "Stopped"); + } } - - public async Task> SubscribeAsync(IEnumerable topicFilters) + + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) + { + try + { + while (!cancellationToken.IsCancellationRequested) + { + var message = _messageQueue.Take(cancellationToken); + if (message == null) + { + continue; + } + + if (cancellationToken.IsCancellationRequested) + { + continue; + } + + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + } + finally + { + _trace.Information(nameof(ManagedMqttClient), "Stopped publishing messages"); + } + } + + private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) { - return await _mqttClient.SubscribeAsync(topicFilters); + try + { + await _mqttClient.PublishAsync(message).ConfigureAwait(false); + } + catch (MqttCommunicationException exception) + { + _trace.Warning(nameof(ManagedMqttClient), exception, "Publishing application message failed."); + + if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) + { + _messageQueue.Add(message); + } + } + catch (Exception exception) + { + _trace.Error(nameof(ManagedMqttClient), exception, "Unhandled exception while publishing queued application message."); + } } - private void ThrowIfNotConnected() + private async Task SaveAsync(List messages) { - if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); - } + if (messages == null) + { + return; + } + + var storage = _options.Storage; + if (storage != null) + { + return; + } + + await _options.Storage.SaveQueuedMessagesAsync(messages); + } - private void SetupOutgoingPacketProcessingAsync() + private async Task PushSubscriptionsAsync() { - //Task.Factory.StartNew( - // () => SendPackets(_mqttClient._cancellationTokenSource.Token), - // _mqttClient._cancellationTokenSource.Token, - // TaskCreationOptions.LongRunning, - // TaskScheduler.Default).ConfigureAwait(false); + _trace.Information(nameof(ManagedMqttClient), "Synchronizing subscriptions"); + + List subscriptions; + lock (_subscriptions) + { + subscriptions = _subscriptions.ToList(); + _subscriptionsNotPushed = false; + } + + if (!_subscriptions.Any()) + { + return; + } + + try + { + await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false); + } + catch (Exception exception) + { + _trace.Warning(nameof(ManagedMqttClient), exception, "Synchronizing subscriptions failed"); + _subscriptionsNotPushed = true; + } } - private async Task SendPackets(CancellationToken cancellationToken) + private async Task ReconnectIfRequiredAsync() { - //MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets."); - //MqttApplicationMessage messageInQueue = null; - - //try - //{ - // while (!cancellationToken.IsCancellationRequested) - // { - // messageInQueue = _inflightQueue.Take(); - // await _mqttClient.PublishAsync(new List() { messageInQueue }); - // if (_usePersistance) - // await _persistentMessagesManager.Remove(messageInQueue); - // } - //} - //catch (OperationCanceledException) - //{ - //} - //catch (MqttCommunicationException exception) - //{ - // MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets."); - // //message not send, equeue it again - // if (messageInQueue != null) - // _inflightQueue.Add(messageInQueue); - //} - //catch (Exception exception) - //{ - // MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets."); - // await DisconnectAsync().ConfigureAwait(false); - //} - //finally - //{ - // MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets."); - //} + if (_mqttClient.IsConnected) + { + return ReconnectionResult.StillConnected; + } + + try + { + await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false); + return ReconnectionResult.Reconnected; + } + catch (Exception) + { + return ReconnectionResult.NotConnected; + } } private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) @@ -156,7 +313,7 @@ namespace MQTTnet.Core.ManagedClient Disconnected?.Invoke(this, eventArgs); } - private void OnConnected(object sender, EventArgs eventArgs) + private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs) { Connected?.Invoke(this, eventArgs); } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs new file mode 100644 index 0000000..62aa166 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientExtensions.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.ManagedClient +{ + public static class ManagedMqttClientExtensions + { + public static Task EnqueueAsync(this ManagedMqttClient managedClient, params MqttApplicationMessage[] applicationMessages) + { + if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + + return managedClient.EnqueueAsync(applicationMessages); + } + + public static Task SubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters) + { + if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + + return managedClient.SubscribeAsync(topicFilters); + } + + public static Task UnsubscribeAsync(this ManagedMqttClient managedClient, params TopicFilter[] topicFilters) + { + if (managedClient == null) throw new ArgumentNullException(nameof(managedClient)); + + return managedClient.UnsubscribeAsync(topicFilters); + } + } +} diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs index bcddb63..2e24fd0 100644 --- a/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs @@ -7,7 +7,6 @@ namespace MQTTnet.Core.ManagedClient { public IMqttClientOptions ClientOptions { get; set; } - public bool UseAutoReconnect { get; set; } = true; public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5); public IManagedMqttClientStorage Storage { get; set; } diff --git a/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs b/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs new file mode 100644 index 0000000..0810f36 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/ManagedMqttClientStorageManager.cs @@ -0,0 +1,75 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Core.ManagedClient +{ + public class ManagedMqttClientStorageManager + { + private readonly List _applicationMessages = new List(); + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private IManagedMqttClientStorage _storage; + + public async Task SetStorageAsync(IManagedMqttClientStorage storage) + { + await _semaphore.WaitAsync().ConfigureAwait(false); + try + { + _storage = storage; + } + finally + { + _semaphore.Release(); + } + } + + public async Task AddAsync(MqttApplicationMessage applicationMessage) + { + await _semaphore.WaitAsync().ConfigureAwait(false); + try + { + if (_storage == null) + { + return; + } + + _applicationMessages.Add(applicationMessage); + await SaveAsync().ConfigureAwait(false); + } + finally + { + _semaphore.Release(); + } + } + + public async Task RemoveAsync(MqttApplicationMessage applicationMessage) + { + await _semaphore.WaitAsync().ConfigureAwait(false); + try + { + if (_storage == null) + { + return; + } + + var index = _applicationMessages.IndexOf(applicationMessage); + if (index == -1) + { + return; + } + + _applicationMessages.RemoveAt(index); + await SaveAsync().ConfigureAwait(false); + } + finally + { + _semaphore.Release(); + } + } + + private Task SaveAsync() + { + return _storage.SaveQueuedMessagesAsync(_applicationMessages); + } + } +} diff --git a/MQTTnet.Core/ManagedClient/ReconnectionResult.cs b/MQTTnet.Core/ManagedClient/ReconnectionResult.cs new file mode 100644 index 0000000..fae0770 --- /dev/null +++ b/MQTTnet.Core/ManagedClient/ReconnectionResult.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Core.ManagedClient +{ + public enum ReconnectionResult + { + StillConnected, + Reconnected, + NotConnected + } +} diff --git a/MQTTnet.Core/Packets/TopicFilter.cs b/MQTTnet.Core/Packets/TopicFilter.cs index e2ae422..6786b1d 100644 --- a/MQTTnet.Core/Packets/TopicFilter.cs +++ b/MQTTnet.Core/Packets/TopicFilter.cs @@ -13,5 +13,15 @@ namespace MQTTnet.Core.Packets public string Topic { get; } public MqttQualityOfServiceLevel QualityOfServiceLevel { get; } + + public override int GetHashCode() + { + return QualityOfServiceLevel.GetHashCode() ^ (Topic ?? string.Empty).GetHashCode(); + } + + public override string ToString() + { + return Topic + "@" + QualityOfServiceLevel; + } } } \ No newline at end of file diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs index a35a266..db3e1b0 100644 --- a/MQTTnet.Core/Server/IMqttServer.cs +++ b/MQTTnet.Core/Server/IMqttServer.cs @@ -6,10 +6,11 @@ namespace MQTTnet.Core.Server { public interface IMqttServer { - event EventHandler ApplicationMessageReceived; event EventHandler ClientConnected; event EventHandler ClientDisconnected; + event EventHandler ApplicationMessageReceived; + IList GetConnectedClients(); void Publish(MqttApplicationMessage applicationMessage); diff --git a/MQTTnet.sln b/MQTTnet.sln index db44de8..56a56aa5 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.27004.2002 +VisualStudioVersion = 15.0.27004.2005 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -31,7 +31,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.NetCore", "Tests\MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{3D283AAD-AAA8-4339-8394-52F80B6304DB}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.TestApp.AspNetCore2", "Tests\MQTTnet.TestApp.AspNetCore2\MQTTnet.TestApp.AspNetCore2.csproj", "{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index 32fc2cc..ae15cc2 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -32,7 +32,7 @@ namespace MQTTnet.Core.Tests { var tcs = new TaskCompletionSource(); - void Handler(object sender, MqttClientConnectedEventArgs args) + void Handler(object sender, Server.MqttClientConnectedEventArgs args) { if (args.Client.ClientId == clientId) { diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs new file mode 100644 index 0000000..4959f37 --- /dev/null +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -0,0 +1,62 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.ManagedClient; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; + +namespace MQTTnet.TestApp.NetCore +{ + public static class ManagedClientTest + { + public static async Task RunAsync() + { + MqttNetTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"); + if (e.TraceMessage.Exception != null) + { + Console.WriteLine(e.TraceMessage.Exception); + } + }; + + var options = new ManagedMqttClientOptions + { + ClientOptions = new MqttClientTcpOptions + { + Server = "broker.hivemq.com", + ClientId = "MQTTnetManagedClientTest" + }, + + AutoReconnectDelay = TimeSpan.FromSeconds(1) + }; + + try + { + var managedClient = new MqttClientFactory().CreateManagedMqttClient(); + managedClient.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); + }; + + await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build()); + await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build()); + + await managedClient.StartAsync(options); + + await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); + + await managedClient.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build()); + + Console.WriteLine("Managed client started."); + Console.ReadLine(); + } + catch (Exception e) + { + Console.WriteLine(e); + } + } + } +} diff --git a/Tests/MQTTnet.TestApp.NetCore/Program.cs b/Tests/MQTTnet.TestApp.NetCore/Program.cs index dab8235..0b5cf64 100644 --- a/Tests/MQTTnet.TestApp.NetCore/Program.cs +++ b/Tests/MQTTnet.TestApp.NetCore/Program.cs @@ -23,22 +23,27 @@ namespace MQTTnet.TestApp.NetCore Console.WriteLine("1 = Start client"); Console.WriteLine("2 = Start server"); Console.WriteLine("3 = Start performance test"); + Console.WriteLine("4 = Start managed client"); + var pressedKey = Console.ReadKey(true); - if (pressedKey.Key == ConsoleKey.D1) + if (pressedKey.KeyChar == '1') { Task.Run(RunClientAsync); - Thread.Sleep(Timeout.Infinite); } - else if (pressedKey.Key == ConsoleKey.D2) + else if (pressedKey.KeyChar == '2') { - Task.Run(() => RunServerAsync()); - Thread.Sleep(Timeout.Infinite); + Task.Run(RunServerAsync); } - else if (pressedKey.Key == ConsoleKey.D3) + else if (pressedKey.KeyChar == '3') { Task.Run(PerformanceTest.RunAsync); - Thread.Sleep(Timeout.Infinite); } + else if (pressedKey.KeyChar == '4') + { + Task.Run(ManagedClientTest.RunAsync); + } + + Thread.Sleep(Timeout.Infinite); } private static async Task RunClientAsync()