Browse Source

Refactor managed MQTT client

release/3.x.x
Christian Kratky 7 years ago
parent
commit
fe88b7551c
14 changed files with 206 additions and 219 deletions
  1. +0
    -5
      Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
  2. +3
    -2
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  3. +0
    -6
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
  4. +3
    -2
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  5. +4
    -2
      MQTTnet.Core/Client/IMqttClientFactory.cs
  6. +0
    -9
      MQTTnet.Core/Client/IMqttClientManaged.cs
  7. +0
    -163
      MQTTnet.Core/Client/MqttClientManaged .cs
  8. +0
    -12
      MQTTnet.Core/Client/MqttClientManagedOptions.cs
  9. +0
    -6
      MQTTnet.Core/MQTTnet.Core.csproj
  10. +15
    -0
      MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs
  11. +2
    -2
      MQTTnet.Core/ManagedClient/IManagedMqttClientStorage.cs
  12. +156
    -0
      MQTTnet.Core/ManagedClient/ManagedMqttClient.cs
  13. +8
    -10
      MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs
  14. +15
    -0
      MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs

+ 0
- 5
Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs View File

@@ -21,11 +21,6 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}

+ 3
- 2
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs View File

@@ -1,4 +1,5 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.ManagedClient;
using MQTTnet.Implementations;

namespace MQTTnet
@@ -10,9 +11,9 @@ namespace MQTTnet
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
public ManagedMqttClient CreateManagedMqttClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
return new ManagedMqttClient(new MqttCommunicationAdapterFactory());
}
}
}

+ 0
- 6
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs View File

@@ -21,12 +21,6 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}


throw new NotSupportedException();
}
}

+ 3
- 2
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs View File

@@ -1,4 +1,5 @@
using MQTTnet.Core.Client;
using MQTTnet.Core.ManagedClient;
using MQTTnet.Implementations;

namespace MQTTnet
@@ -10,9 +11,9 @@ namespace MQTTnet
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
public ManagedMqttClient CreateManagedMqttClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
return new ManagedMqttClient(new MqttCommunicationAdapterFactory());
}
}
}

+ 4
- 2
MQTTnet.Core/Client/IMqttClientFactory.cs View File

@@ -1,9 +1,11 @@
namespace MQTTnet.Core.Client
using MQTTnet.Core.ManagedClient;

namespace MQTTnet.Core.Client
{
public interface IMqttClientFactory
{
IMqttClient CreateMqttClient();

IMqttClientManaged CreateMqttManagedClient();
ManagedMqttClient CreateManagedMqttClient();
}
}

+ 0
- 9
MQTTnet.Core/Client/IMqttClientManaged.cs View File

@@ -1,9 +0,0 @@
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientManaged : IMqttClient
{
//Task ConnectAsync(MqttClientManagedOptions options);
}
}

+ 0
- 163
MQTTnet.Core/Client/MqttClientManaged .cs View File

@@ -1,163 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Client
{
public class MqttClientManaged: IMqttClientManaged
{
private MqttClientManagedOptions _options;
private int _latestPacketIdentifier;
private readonly BlockingCollection<MqttApplicationMessage> _inflightQueue;
private bool _usePersistance = false;
private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager;
private readonly MqttClient _baseMqttClient;

public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
_baseMqttClient = new MqttClient(communicationChannelFactory);
_baseMqttClient.Connected += BaseMqttClient_Connected;
_baseMqttClient.Disconnected += BaseMqttClient_Disconnected;
_baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived;
_inflightQueue = new BlockingCollection<MqttApplicationMessage>();
}

private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}

private void BaseMqttClient_Disconnected(object sender, EventArgs e)
{
Disconnected?.Invoke(this, e);
}

private void BaseMqttClient_Connected(object sender, EventArgs e)
{
Connected?.Invoke(this, e);
}

public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected => _baseMqttClient.IsConnected;


public async Task ConnectAsync(IMqttClientOptions options)
{
//TODO VERY BAD
_options = options as MqttClientManagedOptions;
this._usePersistance = _options.Storage != null;
await _baseMqttClient.ConnectAsync(options);
SetupOutgoingPacketProcessingAsync();

//load persistentMessages
if (_usePersistance)
{
if (_persistentMessagesManager == null)
_persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options);
await _persistentMessagesManager.LoadMessagesAsync();
await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
}
}

public async Task DisconnectAsync()
{
await _baseMqttClient.DisconnectAsync();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
await _baseMqttClient.UnsubscribeAsync(topicFilters);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
await InternalPublishAsync(applicationMessages, true);
}

private async Task InternalPublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages, bool appendIfUsePersistance)
{
ThrowIfNotConnected();

foreach (var applicationMessage in applicationMessages)
{
if (_usePersistance && appendIfUsePersistance)
await _persistentMessagesManager.SaveMessageAsync(applicationMessage);

_inflightQueue.Add(applicationMessage);
}
}

public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await _baseMqttClient.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}

private void SetupOutgoingPacketProcessingAsync()
{
Task.Factory.StartNew(
() => SendPackets(_baseMqttClient._cancellationTokenSource.Token),
_baseMqttClient._cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
}

private async Task SendPackets(CancellationToken cancellationToken)
{
MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
MqttApplicationMessage messageInQueue = null;

try
{
while (!cancellationToken.IsCancellationRequested)
{
messageInQueue = _inflightQueue.Take();
await _baseMqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
if (_usePersistance)
await _persistentMessagesManager.Remove(messageInQueue);
}
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
//message not send, equeue it again
if (messageInQueue != null)
_inflightQueue.Add(messageInQueue);
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
await DisconnectAsync().ConfigureAwait(false);
}
finally
{
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
}
}
}
}

+ 0
- 12
MQTTnet.Core/Client/MqttClientManagedOptions.cs View File

@@ -1,12 +0,0 @@

using System;

namespace MQTTnet.Core.Client
{
public class MqttClientManagedOptions: MqttClientTcpOptions
{
public bool UseAutoReconnect { get; set; }
public TimeSpan AutoReconnectDelay { get; set; }
public IMqttClientQueuedStorage Storage { get; set; }
}
}

+ 0
- 6
MQTTnet.Core/MQTTnet.Core.csproj View File

@@ -22,10 +22,4 @@
<PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup>



<ItemGroup>
<Folder Include="ManagedClient\" />
</ItemGroup>

</Project>

+ 15
- 0
MQTTnet.Core/ManagedClient/IManagedMqttClientOptions.cs View File

@@ -0,0 +1,15 @@
using System;
using MQTTnet.Core.Client;

namespace MQTTnet.Core.ManagedClient
{
public interface IManagedMqttClientOptions
{
IMqttClientOptions ClientOptions { get; }

bool UseAutoReconnect { get; }
TimeSpan AutoReconnectDelay { get; }

IManagedMqttClientStorage Storage { get; }
}
}

MQTTnet.Core/Client/IMqttClientQueuedStorage.cs → MQTTnet.Core/ManagedClient/IManagedMqttClientStorage.cs View File

@@ -1,9 +1,9 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
namespace MQTTnet.Core.ManagedClient
{
public interface IMqttClientQueuedStorage
public interface IManagedMqttClientStorage
{
Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages);


+ 156
- 0
MQTTnet.Core/ManagedClient/ManagedMqttClient.cs View File

@@ -0,0 +1,156 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Client;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.ManagedClient
{
public class ManagedMqttClient
{
private readonly List<MqttApplicationMessage> _messageQueue = new List<MqttApplicationMessage>();
private readonly AutoResetEvent _messageQueueGate = new AutoResetEvent(false);
private readonly MqttClient _mqttClient;

private IManagedMqttClientOptions _options;
public ManagedMqttClient(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
if (communicationChannelFactory == null) throw new ArgumentNullException(nameof(communicationChannelFactory));

_mqttClient = new MqttClient(communicationChannelFactory);
_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
}

private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}

private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
{
//Disconnected?.Invoke(this, e);
}

private void OnConnected(object sender, EventArgs e)
{
Connected?.Invoke(this, e);
}

public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected => _mqttClient.IsConnected;


public void Start(IManagedMqttClientOptions options)
{
}

public void Stop()
{
}

public async Task ConnectAsync(IManagedMqttClientOptions options)
{
//////TODO VERY BAD
////_options = options as ManagedMqttClientTcpOptions;
////this._usePersistance = _options.Storage != null;
////await _mqttClient.ConnectAsync(options);
////SetupOutgoingPacketProcessingAsync();

//////load persistentMessages
////////if (_usePersistance)
////////{
//////// if (_persistentMessagesManager == null)
//////// _persistentMessagesManager = new ManagedMqttClientMessagesManager(_options);
//////// await _persistentMessagesManager.LoadMessagesAsync();
//////// await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
////////}
}

public async Task DisconnectAsync()
{
await _mqttClient.DisconnectAsync();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
// TODO: Move all subscriptions to list an subscribe after connection has lost. But only if server session is new.
await _mqttClient.UnsubscribeAsync(topicFilters);
}

public void Enqueue(IEnumerable<MqttApplicationMessage> applicationMessages)
{
ThrowIfNotConnected();

_messageQueue.AddRange(applicationMessages);
_options.Storage?.SaveQueuedMessagesAsync(_messageQueue.ToList());

_messageQueueGate.Set();
}
public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await _mqttClient.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private void SetupOutgoingPacketProcessingAsync()
{
//Task.Factory.StartNew(
// () => SendPackets(_mqttClient._cancellationTokenSource.Token),
// _mqttClient._cancellationTokenSource.Token,
// TaskCreationOptions.LongRunning,
// TaskScheduler.Default).ConfigureAwait(false);
}

private async Task SendPackets(CancellationToken cancellationToken)
{
//MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
//MqttApplicationMessage messageInQueue = null;

//try
//{
// while (!cancellationToken.IsCancellationRequested)
// {
// messageInQueue = _inflightQueue.Take();
// await _mqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
// if (_usePersistance)
// await _persistentMessagesManager.Remove(messageInQueue);
// }
//}
//catch (OperationCanceledException)
//{
//}
//catch (MqttCommunicationException exception)
//{
// MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
// //message not send, equeue it again
// if (messageInQueue != null)
// _inflightQueue.Add(messageInQueue);
//}
//catch (Exception exception)
//{
// MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
// await DisconnectAsync().ConfigureAwait(false);
//}
//finally
//{
// MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
//}
}
}
}

MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs → MQTTnet.Core/ManagedClient/ManagedMqttClientMessagesManager.cs View File

@@ -1,18 +1,16 @@
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Packets;
using System;
using System.Linq;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Diagnostics;

namespace MQTTnet.Core.Client
namespace MQTTnet.Core.ManagedClient
{
public class MqttClientQueuedPersistentMessagesManager
public class ManagedMqttClientMessagesManager
{
private readonly IList<MqttApplicationMessage> _persistedMessages = new List<MqttApplicationMessage>();
private readonly MqttClientManagedOptions _options;
private readonly ManagedMqttClientOptions _options;

public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options)
public ManagedMqttClientMessagesManager(ManagedMqttClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
@@ -33,7 +31,7 @@ namespace MQTTnet.Core.Client
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages.");
MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while loading persistent messages.");
}
}

@@ -55,7 +53,7 @@ namespace MQTTnet.Core.Client
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages.");
MqttNetTrace.Error(nameof(ManagedMqttClientMessagesManager), exception, "Unhandled exception while saving persistent messages.");
}
}


+ 15
- 0
MQTTnet.Core/ManagedClient/ManagedMqttClientOptions.cs View File

@@ -0,0 +1,15 @@
using System;
using MQTTnet.Core.Client;

namespace MQTTnet.Core.ManagedClient
{
public class ManagedMqttClientOptions : IManagedMqttClientOptions
{
public IMqttClientOptions ClientOptions { get; set; }

public bool UseAutoReconnect { get; set; } = true;
public TimeSpan AutoReconnectDelay { get; set; } = TimeSpan.FromSeconds(5);

public IManagedMqttClientStorage Storage { get; set; }
}
}

Loading…
Cancel
Save