diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs new file mode 100644 index 0000000..fa82c0a --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ApplicationMessageProcessedEventArgs.cs @@ -0,0 +1,19 @@ +using System; + +namespace MQTTnet.ManagedClient +{ + public class ApplicationMessageProcessedEventArgs : EventArgs + { + public ApplicationMessageProcessedEventArgs(MqttApplicationMessage applicationMessage, Exception exception) + { + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + Exception = exception; + } + + public MqttApplicationMessage ApplicationMessage { get; } + public Exception Exception { get; } + + public bool HasFailed => Exception != null; + public bool HasSucceeded => Exception == null; + } +} diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs index 9d67908..6f435d5 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs @@ -12,6 +12,8 @@ namespace MQTTnet.ManagedClient event EventHandler Connected; event EventHandler Disconnected; + event EventHandler ApplicationMessageProcessed; + Task StartAsync(IManagedMqttClientOptions options); Task StopAsync(); diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index a7f5a1c..b18d678 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -27,7 +27,7 @@ namespace MQTTnet.ManagedClient private IManagedMqttClientOptions _options; private bool _subscriptionsNotPushed; - + public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -43,6 +43,7 @@ namespace MQTTnet.ManagedClient public event EventHandler Connected; public event EventHandler Disconnected; public event EventHandler ApplicationMessageReceived; + public event EventHandler ApplicationMessageProcessed; public async Task StartAsync(IManagedMqttClientOptions options) { @@ -57,7 +58,7 @@ namespace MQTTnet.ManagedClient if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started."); _options = options; - + if (_options.Storage != null) { _storageManager = new ManagedMqttClientStorageManager(_options.Storage); @@ -65,7 +66,7 @@ namespace MQTTnet.ManagedClient } _connectionCancellationToken = new CancellationTokenSource(); - + #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed @@ -85,7 +86,7 @@ namespace MQTTnet.ManagedClient return Task.FromResult(0); } - + public async Task PublishAsync(IEnumerable applicationMessages) { if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages)); @@ -96,7 +97,7 @@ namespace MQTTnet.ManagedClient { await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false); } - + _messageQueue.Add(applicationMessage); } } @@ -188,7 +189,7 @@ namespace MQTTnet.ManagedClient StartPublishing(); - + return; } @@ -209,7 +210,7 @@ namespace MQTTnet.ManagedClient _logger.Error(exception, "Unhandled exception while maintaining connection."); } } - + private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try @@ -227,7 +228,7 @@ namespace MQTTnet.ManagedClient continue; } - await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); + await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); } } catch (OperationCanceledException) @@ -245,6 +246,7 @@ namespace MQTTnet.ManagedClient private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message) { + Exception transmitException = null; try { await _mqttClient.PublishAsync(message).ConfigureAwait(false); @@ -256,6 +258,8 @@ namespace MQTTnet.ManagedClient } catch (MqttCommunicationException exception) { + transmitException = exception; + _logger.Warning(exception, "Publishing application message failed."); if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) @@ -265,8 +269,13 @@ namespace MQTTnet.ManagedClient } catch (Exception exception) { + transmitException = exception; _logger.Error(exception, "Unhandled exception while publishing queued application message."); } + finally + { + ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException)); + } } private async Task PushSubscriptionsAsync()