From 78360bc24ab2772fd44b4d47e22a8e0ec2a42950 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Wed, 6 Jun 2018 21:44:25 +0200 Subject: [PATCH] Add ID property for the ManagedMqttApplicationMessage. --- Build/MQTTnet.nuspec | 2 +- .../ApplicationMessageProcessedEventArgs.cs | 4 +- .../IManagedMqttClient.cs | 2 + .../IManagedMqttClientStorage.cs | 4 +- .../ManagedMqttApplicationMessageBuilder.cs | 47 +++++++++++++++++++ .../ManagedMqttClient.cs | 24 ++++++---- .../ManagedMqttClientStorageManager.cs | 8 ++-- .../ManagedClientTest.cs | 10 ++-- 8 files changed, 79 insertions(+), 22 deletions(-) create mode 100644 Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index 8e09d92..71ec12d 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -19,7 +19,7 @@ * [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger) * [Client] A clean disconnect (via DisconnectAsync) will no longer throw an exception. * [ManagedClient] The managed client is moved to a separate nuget package. -* [ManagedClient] Added an own message format with extended properties like ID. +* [ManagedClient] Added an own message format with extended properties like ID (BREAKING CHANGE). * [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta). * [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot). * [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client. diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs b/Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs index 6bc7004..6eb97d2 100644 --- a/Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs +++ b/Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs @@ -4,13 +4,13 @@ namespace MQTTnet.Extensions.ManagedClient { public class ApplicationMessageProcessedEventArgs : EventArgs { - public ApplicationMessageProcessedEventArgs(MqttApplicationMessage applicationMessage, Exception exception) + public ApplicationMessageProcessedEventArgs(ManagedMqttApplicationMessage applicationMessage, Exception exception) { ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); Exception = exception; } - public MqttApplicationMessage ApplicationMessage { get; } + public ManagedMqttApplicationMessage ApplicationMessage { get; } public Exception Exception { get; } public bool HasFailed => Exception != null; diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs b/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs index 1e15daa..ff9f530 100644 --- a/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs +++ b/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs @@ -20,5 +20,7 @@ namespace MQTTnet.Extensions.ManagedClient Task SubscribeAsync(IEnumerable topicFilters); Task UnsubscribeAsync(IEnumerable topics); + + Task PublishAsync(IEnumerable applicationMessages); } } \ No newline at end of file diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs b/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs index 2127054..dc44001 100644 --- a/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs +++ b/Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs @@ -5,8 +5,8 @@ namespace MQTTnet.Extensions.ManagedClient { public interface IManagedMqttClientStorage { - Task SaveQueuedMessagesAsync(IList messages); + Task SaveQueuedMessagesAsync(IList messages); - Task> LoadQueuedMessagesAsync(); + Task> LoadQueuedMessagesAsync(); } } diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs new file mode 100644 index 0000000..bf67f70 --- /dev/null +++ b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs @@ -0,0 +1,47 @@ +using System; + +namespace MQTTnet.Extensions.ManagedClient +{ + public class ManagedMqttApplicationMessageBuilder + { + private Guid _id = Guid.NewGuid(); + private MqttApplicationMessage _applicationMessage; + + public ManagedMqttApplicationMessageBuilder WithId(Guid id) + { + _id = id; + return this; + } + + public ManagedMqttApplicationMessageBuilder WithApplicationMessage(MqttApplicationMessage applicationMessage) + { + _applicationMessage = applicationMessage; + return this; + } + + public ManagedMqttApplicationMessageBuilder WithApplicationMessage(Action builder) + { + if (builder == null) throw new ArgumentNullException(nameof(builder)); + + var internalBuilder = new MqttApplicationMessageBuilder(); + builder(internalBuilder); + + _applicationMessage = internalBuilder.Build(); + return this; + } + + public ManagedMqttApplicationMessage Build() + { + if (_applicationMessage == null) + { + throw new InvalidOperationException("The ApplicationMessage cannot be null."); + } + + return new ManagedMqttApplicationMessage + { + Id = _id, + ApplicationMessage = _applicationMessage + }; + } + } +} diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 0303caf..a9693a8 100644 --- a/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -14,7 +14,7 @@ namespace MQTTnet.Extensions.ManagedClient { public class ManagedMqttClient : IManagedMqttClient { - private readonly BlockingCollection _messageQueue = new BlockingCollection(); + private readonly BlockingCollection _messageQueue = new BlockingCollection(); private readonly Dictionary _subscriptions = new Dictionary(); private readonly AsyncLock _subscriptionsLock = new AsyncLock(); private readonly List _unsubscriptions = new List(); @@ -99,7 +99,15 @@ namespace MQTTnet.Extensions.ManagedClient return Task.FromResult(0); } - public async Task PublishAsync(IEnumerable applicationMessages) + public Task PublishAsync(IEnumerable applicationMessages) + { + if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); + + return PublishAsync(applicationMessages.Select(m => + new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(m).Build())); + } + + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); @@ -244,12 +252,12 @@ namespace MQTTnet.Extensions.ManagedClient } } - private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) + private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message) { Exception transmitException = null; try { - await _mqttClient.PublishAsync(message).ConfigureAwait(false); + await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); if (_storageManager != null) { @@ -260,9 +268,9 @@ namespace MQTTnet.Extensions.ManagedClient { transmitException = exception; - _logger.Warning(exception, "Publishing application message failed."); + _logger.Warning(exception, $"Publishing application ({message.Id}) message failed."); - if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) + if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { _messageQueue.Add(message); } @@ -270,7 +278,7 @@ namespace MQTTnet.Extensions.ManagedClient catch (Exception exception) { transmitException = exception; - _logger.Error(exception, "Unhandled exception while publishing queued application message."); + _logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id})."); } finally { @@ -294,7 +302,7 @@ namespace MQTTnet.Extensions.ManagedClient _subscriptionsNotPushed = false; } - + if (!subscriptions.Any() && !unsubscriptions.Any()) { return; diff --git a/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs index 4d13c33..0f4e33e 100644 --- a/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs +++ b/Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs @@ -8,7 +8,7 @@ namespace MQTTnet.Extensions.ManagedClient { public class ManagedMqttClientStorageManager { - private readonly List _messages = new List(); + private readonly List _messages = new List(); private readonly AsyncLock _messagesLock = new AsyncLock(); private readonly IManagedMqttClientStorage _storage; @@ -18,7 +18,7 @@ namespace MQTTnet.Extensions.ManagedClient _storage = storage ?? throw new ArgumentNullException(nameof(storage)); } - public async Task> LoadQueuedMessagesAsync() + public async Task> LoadQueuedMessagesAsync() { var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false); _messages.AddRange(loadedMessages); @@ -26,7 +26,7 @@ namespace MQTTnet.Extensions.ManagedClient return _messages; } - public async Task AddAsync(MqttApplicationMessage applicationMessage) + public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage) { using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { @@ -35,7 +35,7 @@ namespace MQTTnet.Extensions.ManagedClient } } - public async Task RemoveAsync(MqttApplicationMessage applicationMessage) + public async Task RemoveAsync(ManagedMqttApplicationMessage applicationMessage) { using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { diff --git a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs index 8efdebe..60ce27d 100644 --- a/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs +++ b/Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs @@ -76,23 +76,23 @@ namespace MQTTnet.TestApp.NetCore { private const string Filename = @"RetainedMessages.json"; - public Task SaveQueuedMessagesAsync(IList messages) + public Task SaveQueuedMessagesAsync(IList messages) { File.WriteAllText(Filename, JsonConvert.SerializeObject(messages)); return Task.FromResult(0); } - public Task> LoadQueuedMessagesAsync() + public Task> LoadQueuedMessagesAsync() { - IList retainedMessages; + IList retainedMessages; if (File.Exists(Filename)) { var json = File.ReadAllText(Filename); - retainedMessages = JsonConvert.DeserializeObject>(json); + retainedMessages = JsonConvert.DeserializeObject>(json); } else { - retainedMessages = new List(); + retainedMessages = new List(); } return Task.FromResult(retainedMessages);