@@ -12,6 +12,8 @@ | |||||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | <description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | ||||
<releaseNotes>* [Server] Added new method which exposes all retained messages. | <releaseNotes>* [Server] Added new method which exposes all retained messages. | ||||
* [Server] Removed (wrong) setter from the server options interface. | * [Server] Removed (wrong) setter from the server options interface. | ||||
* [ManagedClient] Added max pending messages count option. | |||||
* [ManagedClient] Add pending messages overflow strategy option. | |||||
</releaseNotes> | </releaseNotes> | ||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
@@ -25,7 +25,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov | |||||
* Performance optimized (processing ~70.000 messages / second)* | * Performance optimized (processing ~70.000 messages / second)* | ||||
* Interfaces included for mocking and testing | * Interfaces included for mocking and testing | ||||
* Access to internal trace messages | * Access to internal trace messages | ||||
* Unit tested (~90 tests) | |||||
* Unit tested (~100 tests) | |||||
\* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_. | \* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_. | ||||
@@ -0,0 +1,14 @@ | |||||
using System; | |||||
namespace MQTTnet.Extensions.ManagedClient | |||||
{ | |||||
public class ApplicationMessageSkippedEventArgs : EventArgs | |||||
{ | |||||
public ApplicationMessageSkippedEventArgs(ManagedMqttApplicationMessage applicationMessage) | |||||
{ | |||||
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); | |||||
} | |||||
public ManagedMqttApplicationMessage ApplicationMessage { get; } | |||||
} | |||||
} |
@@ -15,6 +15,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | event EventHandler<MqttClientDisconnectedEventArgs> Disconnected; | ||||
event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | ||||
event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped; | |||||
event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed; | event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed; | ||||
event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed; | event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed; | ||||
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Server; | |||||
namespace MQTTnet.Extensions.ManagedClient | namespace MQTTnet.Extensions.ManagedClient | ||||
{ | { | ||||
@@ -12,5 +13,9 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
TimeSpan ConnectionCheckInterval { get; } | TimeSpan ConnectionCheckInterval { get; } | ||||
IManagedMqttClientStorage Storage { get; } | IManagedMqttClientStorage Storage { get; } | ||||
int MaxPendingMessages { get; } | |||||
MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; } | |||||
} | } | ||||
} | } |
@@ -1,5 +1,4 @@ | |||||
using System; | using System; | ||||
using System.Collections.Concurrent; | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Linq; | using System.Linq; | ||||
using System.Threading; | using System.Threading; | ||||
@@ -7,13 +6,17 @@ using System.Threading.Tasks; | |||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Exceptions; | using MQTTnet.Exceptions; | ||||
using MQTTnet.Internal; | |||||
using MQTTnet.Protocol; | using MQTTnet.Protocol; | ||||
using MQTTnet.Server; | |||||
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; | |||||
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; | |||||
namespace MQTTnet.Extensions.ManagedClient | namespace MQTTnet.Extensions.ManagedClient | ||||
{ | { | ||||
public class ManagedMqttClient : IManagedMqttClient | public class ManagedMqttClient : IManagedMqttClient | ||||
{ | { | ||||
private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>(); | |||||
private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>(); | |||||
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); | private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); | ||||
private readonly HashSet<string> _unsubscriptions = new HashSet<string>(); | private readonly HashSet<string> _unsubscriptions = new HashSet<string>(); | ||||
@@ -50,6 +53,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; | ||||
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed; | ||||
public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped; | |||||
public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed; | public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed; | ||||
public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed; | public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed; | ||||
@@ -75,7 +79,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
foreach (var message in messages) | foreach (var message in messages) | ||||
{ | { | ||||
_messageQueue.Add(message); | |||||
_messageQueue.Enqueue(message); | |||||
} | } | ||||
} | } | ||||
@@ -93,10 +97,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
StopPublishing(); | StopPublishing(); | ||||
StopMaintainingConnection(); | StopMaintainingConnection(); | ||||
while (_messageQueue.Any()) | |||||
{ | |||||
_messageQueue.Take(); | |||||
} | |||||
_messageQueue.Clear(); | |||||
return Task.FromResult(0); | return Task.FromResult(0); | ||||
} | } | ||||
@@ -112,12 +113,38 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
{ | { | ||||
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); | if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); | ||||
ManagedMqttApplicationMessage skippedMessage = null; | |||||
lock (_messageQueue) | |||||
{ | |||||
if (_messageQueue.Count > _options.MaxPendingMessages) | |||||
{ | |||||
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) | |||||
{ | |||||
_logger.Verbose("Skipping publish of new application message because internal queue is full."); | |||||
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage)); | |||||
return; | |||||
} | |||||
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) | |||||
{ | |||||
skippedMessage = _messageQueue.RemoveFirst(); | |||||
_logger.Verbose("Removed oldest application message from internal queue because it is full."); | |||||
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(skippedMessage)); | |||||
} | |||||
} | |||||
_messageQueue.Enqueue(applicationMessage); | |||||
} | |||||
if (_storageManager != null) | if (_storageManager != null) | ||||
{ | { | ||||
if (skippedMessage != null) | |||||
{ | |||||
await _storageManager.RemoveAsync(skippedMessage).ConfigureAwait(false); | |||||
} | |||||
await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); | await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); | ||||
} | } | ||||
_messageQueue.Add(applicationMessage); | |||||
} | } | ||||
public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) | public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) | ||||
@@ -157,7 +184,6 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
_messageQueue?.Dispose(); | |||||
_connectionCancellationToken?.Dispose(); | _connectionCancellationToken?.Dispose(); | ||||
_publishingCancellationToken?.Dispose(); | _publishingCancellationToken?.Dispose(); | ||||
} | } | ||||
@@ -228,7 +254,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) | while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) | ||||
{ | { | ||||
var message = _messageQueue.Take(cancellationToken); | |||||
var message = _messageQueue.Dequeue(); | |||||
if (message == null) | if (message == null) | ||||
{ | { | ||||
continue; | continue; | ||||
@@ -268,7 +294,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | ||||
{ | { | ||||
_messageQueue.Add(message); | |||||
_messageQueue.Enqueue(message); | |||||
} | } | ||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Server; | |||||
namespace MQTTnet.Extensions.ManagedClient | namespace MQTTnet.Extensions.ManagedClient | ||||
{ | { | ||||
@@ -12,5 +13,9 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1); | public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1); | ||||
public IManagedMqttClientStorage Storage { get; set; } | public IManagedMqttClientStorage Storage { get; set; } | ||||
public int MaxPendingMessages { get; set; } = int.MaxValue; | |||||
public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropNewMessage; | |||||
} | } | ||||
} | } |
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Server; | |||||
namespace MQTTnet.Extensions.ManagedClient | namespace MQTTnet.Extensions.ManagedClient | ||||
{ | { | ||||
@@ -8,6 +9,18 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
private readonly ManagedMqttClientOptions _options = new ManagedMqttClientOptions(); | private readonly ManagedMqttClientOptions _options = new ManagedMqttClientOptions(); | ||||
private MqttClientOptionsBuilder _clientOptionsBuilder; | private MqttClientOptionsBuilder _clientOptionsBuilder; | ||||
public ManagedMqttClientOptionsBuilder WithMaxPendingMessages(int value) | |||||
{ | |||||
_options.MaxPendingMessages = value; | |||||
return this; | |||||
} | |||||
public ManagedMqttClientOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value) | |||||
{ | |||||
_options.PendingMessagesOverflowStrategy = value; | |||||
return this; | |||||
} | |||||
public ManagedMqttClientOptionsBuilder WithAutoReconnectDelay(TimeSpan value) | public ManagedMqttClientOptionsBuilder WithAutoReconnectDelay(TimeSpan value) | ||||
{ | { | ||||
_options.AutoReconnectDelay = value; | _options.AutoReconnectDelay = value; | ||||
@@ -0,0 +1,77 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
namespace MQTTnet.Internal | |||||
{ | |||||
public class BlockingQueue<TItem> | |||||
{ | |||||
private readonly object _syncRoot = new object(); | |||||
private readonly LinkedList<TItem> _items = new LinkedList<TItem>(); | |||||
private readonly ManualResetEvent _gate = new ManualResetEvent(false); | |||||
public int Count | |||||
{ | |||||
get | |||||
{ | |||||
lock (_syncRoot) | |||||
{ | |||||
return _items.Count; | |||||
} | |||||
} | |||||
} | |||||
public void Enqueue(TItem item) | |||||
{ | |||||
if (item == null) throw new ArgumentNullException(nameof(item)); | |||||
lock (_syncRoot) | |||||
{ | |||||
_items.AddLast(item); | |||||
_gate.Set(); | |||||
} | |||||
} | |||||
public TItem Dequeue() | |||||
{ | |||||
while (true) | |||||
{ | |||||
lock (_syncRoot) | |||||
{ | |||||
if (_items.Count > 0) | |||||
{ | |||||
var item = _items.First.Value; | |||||
_items.RemoveFirst(); | |||||
return item; | |||||
} | |||||
if (_items.Count == 0) | |||||
{ | |||||
_gate.Reset(); | |||||
} | |||||
} | |||||
_gate.WaitOne(); | |||||
} | |||||
} | |||||
public TItem RemoveFirst() | |||||
{ | |||||
lock (_syncRoot) | |||||
{ | |||||
var item = _items.First; | |||||
_items.RemoveFirst(); | |||||
return item.Value; | |||||
} | |||||
} | |||||
public void Clear() | |||||
{ | |||||
lock (_syncRoot) | |||||
{ | |||||
_items.Clear(); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,108 @@ | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||||
using MQTTnet.Internal; | |||||
namespace MQTTnet.Core.Tests | |||||
{ | |||||
[TestClass] | |||||
public class BlockingQueueTests | |||||
{ | |||||
[TestMethod] | |||||
public void Preserve_Order() | |||||
{ | |||||
var queue = new BlockingQueue<string>(); | |||||
queue.Enqueue("a"); | |||||
queue.Enqueue("b"); | |||||
queue.Enqueue("c"); | |||||
Assert.AreEqual(3, queue.Count); | |||||
Assert.AreEqual("a", queue.Dequeue()); | |||||
Assert.AreEqual("b", queue.Dequeue()); | |||||
Assert.AreEqual("c", queue.Dequeue()); | |||||
} | |||||
[TestMethod] | |||||
public void Remove_First_Items() | |||||
{ | |||||
var queue = new BlockingQueue<string>(); | |||||
queue.Enqueue("a"); | |||||
queue.Enqueue("b"); | |||||
queue.Enqueue("c"); | |||||
Assert.AreEqual("a", queue.RemoveFirst()); | |||||
Assert.AreEqual("b", queue.RemoveFirst()); | |||||
Assert.AreEqual(1, queue.Count); | |||||
Assert.AreEqual("c", queue.Dequeue()); | |||||
} | |||||
[TestMethod] | |||||
public void Clear_Items() | |||||
{ | |||||
var queue = new BlockingQueue<string>(); | |||||
queue.Enqueue("a"); | |||||
queue.Enqueue("b"); | |||||
queue.Enqueue("c"); | |||||
Assert.AreEqual(3, queue.Count); | |||||
queue.Clear(); | |||||
Assert.AreEqual(0, queue.Count); | |||||
} | |||||
[TestMethod] | |||||
public async Task Wait_For_Item() | |||||
{ | |||||
var queue = new BlockingQueue<string>(); | |||||
string item = null; | |||||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
Task.Run(() => | |||||
{ | |||||
item = queue.Dequeue(); | |||||
}); | |||||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
await Task.Delay(100); | |||||
Assert.AreEqual(0, queue.Count); | |||||
Assert.AreEqual(null, item); | |||||
queue.Enqueue("x"); | |||||
await Task.Delay(100); | |||||
Assert.AreEqual("x", item); | |||||
Assert.AreEqual(0, queue.Count); | |||||
} | |||||
[TestMethod] | |||||
public void Wait_For_Times() | |||||
{ | |||||
var number = 0; | |||||
var queue = new BlockingQueue<int>(); | |||||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
Task.Run(async () => | |||||
{ | |||||
while (true) | |||||
{ | |||||
queue.Enqueue(1); | |||||
await Task.Delay(100); | |||||
} | |||||
}); | |||||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
while (number < 50) | |||||
{ | |||||
queue.Dequeue(); | |||||
Interlocked.Increment(ref number); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -13,6 +13,7 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj" /> | |||||
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" /> | <ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -0,0 +1,38 @@ | |||||
using MQTTnet.Extensions.ManagedClient; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||||
using MQTTnet.Server; | |||||
namespace MQTTnet.Core.Tests | |||||
{ | |||||
[TestClass] | |||||
public class ManagedMqttClientTests | |||||
{ | |||||
[TestMethod] | |||||
public async Task Drop_New_Messages_On_Full_Queue() | |||||
{ | |||||
var factory = new MqttFactory(); | |||||
var managedClient = factory.CreateManagedMqttClient(); | |||||
var clientOptions = new ManagedMqttClientOptionsBuilder() | |||||
.WithMaxPendingMessages(5) | |||||
.WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage); | |||||
clientOptions.WithClientOptions(o => o.WithTcpServer("localhost")); | |||||
await managedClient.StartAsync(clientOptions.Build()); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "1" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "2" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "3" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "4" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "5" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "6" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "7" }); | |||||
await managedClient.PublishAsync(new MqttApplicationMessage { Topic = "8" }); | |||||
Assert.AreEqual(5, managedClient.PendingApplicationMessagesCount); | |||||
} | |||||
} | |||||
} |
@@ -340,6 +340,8 @@ namespace MQTTnet.Core.Tests | |||||
} | } | ||||
}); | }); | ||||
await Task.Delay(100); | |||||
var retainedMessages = server.GetRetainedMessages(); | var retainedMessages = server.GetRetainedMessages(); | ||||
Assert.AreEqual(ClientCount, retainedMessages.Count); | Assert.AreEqual(ClientCount, retainedMessages.Count); | ||||
@@ -45,7 +45,9 @@ namespace MQTTnet.Core.Tests | |||||
stopwatch.Stop(); | stopwatch.Stop(); | ||||
times.Add(stopwatch.Elapsed); | times.Add(stopwatch.Elapsed); | ||||
stopwatch.Restart(); | stopwatch.Restart(); | ||||
} | |||||
} | |||||
await server.StopAsync(); | |||||
} | } | ||||
} | } | ||||
} | } |