Browse Source

Merge

release/3.x.x
Christian Kratky 7 years ago
parent
commit
bddfadaefa
12 changed files with 303 additions and 8 deletions
  1. +5
    -0
      Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
  2. +5
    -0
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  3. +6
    -0
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
  4. +5
    -0
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  5. +2
    -0
      MQTTnet.Core/Client/IMqttClientFactory.cs
  6. +9
    -0
      MQTTnet.Core/Client/IMqttClientManaged.cs
  7. +12
    -0
      MQTTnet.Core/Client/IMqttClientQueuedStorage.cs
  8. +163
    -0
      MQTTnet.Core/Client/MqttClientManaged .cs
  9. +12
    -0
      MQTTnet.Core/Client/MqttClientManagedOptions.cs
  10. +84
    -0
      MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs
  11. +0
    -8
      MQTTnet.Core/MQTTnet.Core.csproj
  12. BIN
     

+ 5
- 0
Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs View File

@@ -21,6 +21,11 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

throw new NotSupportedException();
}
}

+ 5
- 0
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs View File

@@ -9,5 +9,10 @@ namespace MQTTnet
{
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 6
- 0
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs View File

@@ -21,6 +21,12 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}


throw new NotSupportedException();
}
}

+ 5
- 0
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs View File

@@ -9,5 +9,10 @@ namespace MQTTnet
{
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 2
- 0
MQTTnet.Core/Client/IMqttClientFactory.cs View File

@@ -3,5 +3,7 @@
public interface IMqttClientFactory
{
IMqttClient CreateMqttClient();

IMqttClientManaged CreateMqttManagedClient();
}
}

+ 9
- 0
MQTTnet.Core/Client/IMqttClientManaged.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientManaged : IMqttClient
{
//Task ConnectAsync(MqttClientManagedOptions options);
}
}

+ 12
- 0
MQTTnet.Core/Client/IMqttClientQueuedStorage.cs View File

@@ -0,0 +1,12 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientQueuedStorage
{
Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages);

Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync();
}
}

+ 163
- 0
MQTTnet.Core/Client/MqttClientManaged .cs View File

@@ -0,0 +1,163 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Client
{
public class MqttClientManaged: IMqttClientManaged
{
private MqttClientManagedOptions _options;
private int _latestPacketIdentifier;
private readonly BlockingCollection<MqttApplicationMessage> _inflightQueue;
private bool _usePersistance = false;
private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager;
private readonly MqttClient _baseMqttClient;

public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
_baseMqttClient = new MqttClient(communicationChannelFactory);
_baseMqttClient.Connected += BaseMqttClient_Connected;
_baseMqttClient.Disconnected += BaseMqttClient_Disconnected;
_baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived;
_inflightQueue = new BlockingCollection<MqttApplicationMessage>();
}

private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}

private void BaseMqttClient_Disconnected(object sender, EventArgs e)
{
Disconnected?.Invoke(this, e);
}

private void BaseMqttClient_Connected(object sender, EventArgs e)
{
Connected?.Invoke(this, e);
}

public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected => _baseMqttClient.IsConnected;


public async Task ConnectAsync(IMqttClientOptions options)
{
//TODO VERY BAD
_options = options as MqttClientManagedOptions;
this._usePersistance = _options.Storage != null;
await _baseMqttClient.ConnectAsync(options);
SetupOutgoingPacketProcessingAsync();

//load persistentMessages
if (_usePersistance)
{
if (_persistentMessagesManager == null)
_persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options);
await _persistentMessagesManager.LoadMessagesAsync();
await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
}
}

public async Task DisconnectAsync()
{
await _baseMqttClient.DisconnectAsync();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
await _baseMqttClient.UnsubscribeAsync(topicFilters);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
await InternalPublishAsync(applicationMessages, true);
}

private async Task InternalPublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages, bool appendIfUsePersistance)
{
ThrowIfNotConnected();

foreach (var applicationMessage in applicationMessages)
{
if (_usePersistance && appendIfUsePersistance)
await _persistentMessagesManager.SaveMessageAsync(applicationMessage);

_inflightQueue.Add(applicationMessage);
}
}

public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await _baseMqttClient.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}

private void SetupOutgoingPacketProcessingAsync()
{
Task.Factory.StartNew(
() => SendPackets(_baseMqttClient._cancellationTokenSource.Token),
_baseMqttClient._cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
}

private async Task SendPackets(CancellationToken cancellationToken)
{
MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
MqttApplicationMessage messageInQueue = null;

try
{
while (!cancellationToken.IsCancellationRequested)
{
messageInQueue = _inflightQueue.Take();
await _baseMqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
if (_usePersistance)
await _persistentMessagesManager.Remove(messageInQueue);
}
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
//message not send, equeue it again
if (messageInQueue != null)
_inflightQueue.Add(messageInQueue);
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
await DisconnectAsync().ConfigureAwait(false);
}
finally
{
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
}
}
}
}

+ 12
- 0
MQTTnet.Core/Client/MqttClientManagedOptions.cs View File

@@ -0,0 +1,12 @@

using System;

namespace MQTTnet.Core.Client
{
public class MqttClientManagedOptions: MqttClientTcpOptions
{
public bool UseAutoReconnect { get; set; }
public TimeSpan AutoReconnectDelay { get; set; }
public IMqttClientQueuedStorage Storage { get; set; }
}
}

+ 84
- 0
MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs View File

@@ -0,0 +1,84 @@
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Packets;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public class MqttClientQueuedPersistentMessagesManager
{
private readonly IList<MqttApplicationMessage> _persistedMessages = new List<MqttApplicationMessage>();
private readonly MqttClientManagedOptions _options;

public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}

public async Task LoadMessagesAsync()
{
try
{
var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync();
lock (_persistedMessages)
{
_persistedMessages.Clear();
foreach (var persistentMessage in persistentMessages)
{
_persistedMessages.Add(persistentMessage);
}
}
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while loading persistent messages.");
}
}

public async Task SaveMessageAsync(MqttApplicationMessage applicationMessage)
{
if (applicationMessage != null)
{
lock (_persistedMessages)
{
_persistedMessages.Add(applicationMessage);
}
}
try
{
if (_options.Storage != null)
{
await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages);
}
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClientQueuedPersistentMessagesManager), exception, "Unhandled exception while saving persistent messages.");
}
}

public List<MqttApplicationMessage> GetMessages()
{
var persistedMessages = new List<MqttApplicationMessage>();
lock (_persistedMessages)
{
foreach (var persistedMessage in _persistedMessages)
{
persistedMessages.Add(persistedMessage);
}
}

return persistedMessages;
}

public async Task Remove(MqttApplicationMessage message)
{
lock (_persistedMessages)
_persistedMessages.Remove(message);

await SaveMessageAsync(null);
}
}
}

+ 0
- 8
MQTTnet.Core/MQTTnet.Core.csproj View File

@@ -22,15 +22,7 @@
<PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|AnyCPU'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|x64'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x86'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x86'" />

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Any CPU|x86'" />

<ItemGroup>
<Folder Include="ManagedClient\" />


BIN
View File


Loading…
Cancel
Save