Browse Source

Add ID property for the ManagedMqttApplicationMessage.

release/3.x.x
Christian Kratky 6 years ago
parent
commit
78360bc24a
8 changed files with 79 additions and 22 deletions
  1. +1
    -1
      Build/MQTTnet.nuspec
  2. +2
    -2
      Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs
  3. +2
    -0
      Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
  4. +2
    -2
      Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs
  5. +47
    -0
      Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs
  6. +16
    -8
      Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  7. +4
    -4
      Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs
  8. +5
    -5
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs

+ 1
- 1
Build/MQTTnet.nuspec View File

@@ -19,7 +19,7 @@
* [Client] Fixed wrong calculation for sending keep alive packets (thanks to @cstichlberger)
* [Client] A clean disconnect (via DisconnectAsync) will no longer throw an exception.
* [ManagedClient] The managed client is moved to a separate nuget package.
* [ManagedClient] Added an own message format with extended properties like ID.
* [ManagedClient] Added an own message format with extended properties like ID (BREAKING CHANGE).
* [ManagedClient] Fixed a loading issue of stored application messages (thanks to @JTrotta).
* [Server] Added support for other WebSocket sub protocol formats like mqttv-3.1.1 (thanks to @israellot).
* [Server] The takeover of an existing client sessions is now treated as a _clean_ disconnect of the previous client.


+ 2
- 2
Extensions/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedEventArgs.cs View File

@@ -4,13 +4,13 @@ namespace MQTTnet.Extensions.ManagedClient
{
public class ApplicationMessageProcessedEventArgs : EventArgs
{
public ApplicationMessageProcessedEventArgs(MqttApplicationMessage applicationMessage, Exception exception)
public ApplicationMessageProcessedEventArgs(ManagedMqttApplicationMessage applicationMessage, Exception exception)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
Exception = exception;
}

public MqttApplicationMessage ApplicationMessage { get; }
public ManagedMqttApplicationMessage ApplicationMessage { get; }
public Exception Exception { get; }

public bool HasFailed => Exception != null;


+ 2
- 0
Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs View File

@@ -20,5 +20,7 @@ namespace MQTTnet.Extensions.ManagedClient

Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters);
Task UnsubscribeAsync(IEnumerable<string> topics);

Task PublishAsync(IEnumerable<ManagedMqttApplicationMessage> applicationMessages);
}
}

+ 2
- 2
Extensions/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs View File

@@ -5,8 +5,8 @@ namespace MQTTnet.Extensions.ManagedClient
{
public interface IManagedMqttClientStorage
{
Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages);
Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages);

Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync();
Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync();
}
}

+ 47
- 0
Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttApplicationMessageBuilder.cs View File

@@ -0,0 +1,47 @@
using System;

namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttApplicationMessageBuilder
{
private Guid _id = Guid.NewGuid();
private MqttApplicationMessage _applicationMessage;

public ManagedMqttApplicationMessageBuilder WithId(Guid id)
{
_id = id;
return this;
}

public ManagedMqttApplicationMessageBuilder WithApplicationMessage(MqttApplicationMessage applicationMessage)
{
_applicationMessage = applicationMessage;
return this;
}

public ManagedMqttApplicationMessageBuilder WithApplicationMessage(Action<MqttApplicationMessageBuilder> builder)
{
if (builder == null) throw new ArgumentNullException(nameof(builder));

var internalBuilder = new MqttApplicationMessageBuilder();
builder(internalBuilder);

_applicationMessage = internalBuilder.Build();
return this;
}

public ManagedMqttApplicationMessage Build()
{
if (_applicationMessage == null)
{
throw new InvalidOperationException("The ApplicationMessage cannot be null.");
}

return new ManagedMqttApplicationMessage
{
Id = _id,
ApplicationMessage = _applicationMessage
};
}
}
}

+ 16
- 8
Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
{
private readonly BlockingCollection<MqttApplicationMessage> _messageQueue = new BlockingCollection<MqttApplicationMessage>();
private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly AsyncLock _subscriptionsLock = new AsyncLock();
private readonly List<string> _unsubscriptions = new List<string>();
@@ -99,7 +99,15 @@ namespace MQTTnet.Extensions.ManagedClient
return Task.FromResult(0);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
public Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

return PublishAsync(applicationMessages.Select(m =>
new ManagedMqttApplicationMessageBuilder().WithApplicationMessage(m).Build()));
}

public async Task PublishAsync(IEnumerable<ManagedMqttApplicationMessage> applicationMessages)
{
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

@@ -244,12 +252,12 @@ namespace MQTTnet.Extensions.ManagedClient
}
}

private async Task TryPublishQueuedMessageAsync(MqttApplicationMessage message)
private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
{
Exception transmitException = null;
try
{
await _mqttClient.PublishAsync(message).ConfigureAwait(false);
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);

if (_storageManager != null)
{
@@ -260,9 +268,9 @@ namespace MQTTnet.Extensions.ManagedClient
{
transmitException = exception;

_logger.Warning(exception, "Publishing application message failed.");
_logger.Warning(exception, $"Publishing application ({message.Id}) message failed.");

if (message.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
_messageQueue.Add(message);
}
@@ -270,7 +278,7 @@ namespace MQTTnet.Extensions.ManagedClient
catch (Exception exception)
{
transmitException = exception;
_logger.Error(exception, "Unhandled exception while publishing queued application message.");
_logger.Error(exception, $"Unhandled exception while publishing application message ({message.Id}).");
}
finally
{
@@ -294,7 +302,7 @@ namespace MQTTnet.Extensions.ManagedClient

_subscriptionsNotPushed = false;
}
if (!subscriptions.Any() && !unsubscriptions.Any())
{
return;


+ 4
- 4
Extensions/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs View File

@@ -8,7 +8,7 @@ namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClientStorageManager
{
private readonly List<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
private readonly List<ManagedMqttApplicationMessage> _messages = new List<ManagedMqttApplicationMessage>();
private readonly AsyncLock _messagesLock = new AsyncLock();

private readonly IManagedMqttClientStorage _storage;
@@ -18,7 +18,7 @@ namespace MQTTnet.Extensions.ManagedClient
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
}

public async Task<List<MqttApplicationMessage>> LoadQueuedMessagesAsync()
public async Task<List<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
{
var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false);
_messages.AddRange(loadedMessages);
@@ -26,7 +26,7 @@ namespace MQTTnet.Extensions.ManagedClient
return _messages;
}

public async Task AddAsync(MqttApplicationMessage applicationMessage)
public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage)
{
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{
@@ -35,7 +35,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
}

public async Task RemoveAsync(MqttApplicationMessage applicationMessage)
public async Task RemoveAsync(ManagedMqttApplicationMessage applicationMessage)
{
using (await _messagesLock.LockAsync(CancellationToken.None).ConfigureAwait(false))
{


+ 5
- 5
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -76,23 +76,23 @@ namespace MQTTnet.TestApp.NetCore
{
private const string Filename = @"RetainedMessages.json";

public Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages)
public Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
{
File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
return Task.FromResult(0);
}

public Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync()
public Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
{
IList<MqttApplicationMessage> retainedMessages;
IList<ManagedMqttApplicationMessage> retainedMessages;
if (File.Exists(Filename))
{
var json = File.ReadAllText(Filename);
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
retainedMessages = JsonConvert.DeserializeObject<List<ManagedMqttApplicationMessage>>(json);
}
else
{
retainedMessages = new List<MqttApplicationMessage>();
retainedMessages = new List<ManagedMqttApplicationMessage>();
}

return Task.FromResult(retainedMessages);


Loading…
Cancel
Save