Przeglądaj źródła

Refactoring

Added almost of the hits. Options modified but must be reviewed completely.
release/3.x.x
Gerardo 7 lat temu
rodzic
commit
870ed00e3c
24 zmienionych plików z 273 dodań i 231 usunięć
  1. +2
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs
  2. +1
    -1
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  3. +2
    -2
      Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
  4. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs
  5. +1
    -1
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs
  6. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
  7. +1
    -1
      MQTTnet.Core/Client/IMqttClient.cs
  8. +1
    -1
      MQTTnet.Core/Client/IMqttClientFactory.cs
  9. +9
    -0
      MQTTnet.Core/Client/IMqttClientManaged.cs
  10. +0
    -9
      MQTTnet.Core/Client/IMqttClientQueued.cs
  11. +2
    -2
      MQTTnet.Core/Client/IMqttClientQueuedStorage.cs
  12. +1
    -1
      MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs
  13. +3
    -2
      MQTTnet.Core/Client/MqttClient.cs
  14. +163
    -0
      MQTTnet.Core/Client/MqttClientManaged .cs
  15. +12
    -0
      MQTTnet.Core/Client/MqttClientManagedOptions.cs
  16. +12
    -12
      MQTTnet.Core/Client/MqttClientOptions.cs
  17. +0
    -166
      MQTTnet.Core/Client/MqttClientQueued.cs
  18. +0
    -11
      MQTTnet.Core/Client/MqttClientQueuedOptions.cs
  19. +4
    -4
      MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs
  20. +24
    -2
      MQTTnet.Core/Client/MqttClientTcpOptions.cs
  21. +24
    -2
      MQTTnet.Core/Client/MqttClientWebSocketOptions.cs
  22. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs
  23. +5
    -6
      Tests/MQTTnet.TestApp.NetCore/Program.cs
  24. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 2
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttCommunicationAdapterFactory.cs Wyświetl plik

@@ -7,7 +7,7 @@ namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

@@ -21,7 +21,7 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientQueuedOptions queuedOptions)
if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs Wyświetl plik

@@ -121,7 +121,7 @@ namespace MQTTnet.Implementations
return _options.TlsOptions.AllowUntrustedCertificates;
}

private static X509CertificateCollection LoadCertificates(MqttClientOptions options)
private static X509CertificateCollection LoadCertificates(IMqttClientOptions options)
{
var certificates = new X509CertificateCollection();
if (options.TlsOptions.Certificates == null)


+ 2
- 2
Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs Wyświetl plik

@@ -10,9 +10,9 @@ namespace MQTTnet
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientQueued CreateMqttQueuedClient()
public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientQueued(new MqttCommunicationAdapterFactory());
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttCommunicationAdapterFactory.cs Wyświetl plik

@@ -7,7 +7,7 @@ namespace MQTTnet.Implementations
{
public class MqttCommunicationAdapterFactory : IMqttCommunicationAdapterFactory
{
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

@@ -21,7 +21,7 @@ namespace MQTTnet.Implementations
return new MqttChannelCommunicationAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}

if (options is MqttClientQueuedOptions queuedOptions)
if (options is MqttClientManagedOptions queuedOptions)
{
return new MqttChannelCommunicationAdapter(new MqttTcpChannel(queuedOptions), new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion });
}


+ 1
- 1
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttTcpChannel.cs Wyświetl plik

@@ -89,7 +89,7 @@ namespace MQTTnet.Implementations
RawReceiveStream = ReceiveStream;
}

private static Certificate LoadCertificate(MqttClientOptions options)
private static Certificate LoadCertificate(IMqttClientOptions options)
{
if (options.TlsOptions.Certificates == null || !options.TlsOptions.Certificates.Any())
{


+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs Wyświetl plik

@@ -10,9 +10,9 @@ namespace MQTTnet
return new MqttClient(new MqttCommunicationAdapterFactory());
}

public IMqttClientQueued CreateMqttQueuedClient()
public IMqttClientManaged CreateMqttManagedClient()
{
return new MqttClientQueued(new MqttCommunicationAdapterFactory());
return new MqttClientManaged(new MqttCommunicationAdapterFactory());
}
}
}

+ 1
- 1
MQTTnet.Core/Client/IMqttClient.cs Wyświetl plik

@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Client
event EventHandler Connected;
event EventHandler Disconnected;

Task ConnectAsync(MqttClientOptions options);
Task ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync();

Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters);


+ 1
- 1
MQTTnet.Core/Client/IMqttClientFactory.cs Wyświetl plik

@@ -4,6 +4,6 @@
{
IMqttClient CreateMqttClient();

IMqttClientQueued CreateMqttQueuedClient();
IMqttClientManaged CreateMqttManagedClient();
}
}

+ 9
- 0
MQTTnet.Core/Client/IMqttClientManaged.cs Wyświetl plik

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientManaged : IMqttClient
{
//Task ConnectAsync(MqttClientManagedOptions options);
}
}

+ 0
- 9
MQTTnet.Core/Client/IMqttClientQueued.cs Wyświetl plik

@@ -1,9 +0,0 @@
using System.Threading.Tasks;

namespace MQTTnet.Core.Client
{
public interface IMqttClientQueued: IMqttClient
{
Task ConnectAsync(MqttClientQueuedOptions options);
}
}

+ 2
- 2
MQTTnet.Core/Client/IMqttClientQueuedStorage.cs Wyświetl plik

@@ -5,8 +5,8 @@ namespace MQTTnet.Core.Client
{
public interface IMqttClientQueuedStorage
{
Task SaveInflightMessagesAsync(IList<MqttApplicationMessage> messages);
Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages);

Task<IList<MqttApplicationMessage>> LoadInflightMessagesAsync();
Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync();
}
}

+ 1
- 1
MQTTnet.Core/Client/IMqttCommunicationAdapterFactory.cs Wyświetl plik

@@ -4,6 +4,6 @@ namespace MQTTnet.Core.Client
{
public interface IMqttCommunicationAdapterFactory
{
IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options);
IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options);
}
}

+ 3
- 2
MQTTnet.Core/Client/MqttClient.cs Wyświetl plik

@@ -18,7 +18,7 @@ namespace MQTTnet.Core.Client
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly IMqttCommunicationAdapterFactory _communicationChannelFactory;

private MqttClientOptions _options;
private IMqttClientOptions _options;
private bool _isReceivingPackets;
private int _latestPacketIdentifier;
internal CancellationTokenSource _cancellationTokenSource;
@@ -35,7 +35,8 @@ namespace MQTTnet.Core.Client

public bool IsConnected => _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested;

public async Task ConnectAsync(MqttClientOptions options)

public async Task ConnectAsync(IMqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));



+ 163
- 0
MQTTnet.Core/Client/MqttClientManaged .cs Wyświetl plik

@@ -0,0 +1,163 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Client
{
public class MqttClientManaged: IMqttClientManaged
{
private MqttClientManagedOptions _options;
private int _latestPacketIdentifier;
private readonly BlockingCollection<MqttApplicationMessage> _inflightQueue;
private bool _usePersistance = false;
private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager;
private readonly MqttClient _baseMqttClient;

public MqttClientManaged(IMqttCommunicationAdapterFactory communicationChannelFactory)
{
_baseMqttClient = new MqttClient(communicationChannelFactory);
_baseMqttClient.Connected += BaseMqttClient_Connected;
_baseMqttClient.Disconnected += BaseMqttClient_Disconnected;
_baseMqttClient.ApplicationMessageReceived += BaseMqttClient_ApplicationMessageReceived;
_inflightQueue = new BlockingCollection<MqttApplicationMessage>();
}

private void BaseMqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
ApplicationMessageReceived?.Invoke(this, e);
}

private void BaseMqttClient_Disconnected(object sender, EventArgs e)
{
Disconnected?.Invoke(this, e);
}

private void BaseMqttClient_Connected(object sender, EventArgs e)
{
Connected?.Invoke(this, e);
}

public event EventHandler Connected;
public event EventHandler Disconnected;
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected => _baseMqttClient.IsConnected;


public async Task ConnectAsync(IMqttClientOptions options)
{
//TODO VERY BAD
_options = options as MqttClientManagedOptions;
this._usePersistance = _options.Storage != null;
await _baseMqttClient.ConnectAsync(options);
SetupOutgoingPacketProcessingAsync();

//load persistentMessages
if (_usePersistance)
{
if (_persistentMessagesManager == null)
_persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options);
await _persistentMessagesManager.LoadMessagesAsync();
await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
}
}

public async Task DisconnectAsync()
{
await _baseMqttClient.DisconnectAsync();
}

public async Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
await _baseMqttClient.UnsubscribeAsync(topicFilters);
}

public async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
await InternalPublishAsync(applicationMessages, true);
}

private async Task InternalPublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages, bool appendIfUsePersistance)
{
ThrowIfNotConnected();

foreach (var applicationMessage in applicationMessages)
{
if (_usePersistance && appendIfUsePersistance)
await _persistentMessagesManager.SaveMessageAsync(applicationMessage);

_inflightQueue.Add(applicationMessage);
}
}

public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await _baseMqttClient.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}

private void SetupOutgoingPacketProcessingAsync()
{
Task.Factory.StartNew(
() => SendPackets(_baseMqttClient._cancellationTokenSource.Token),
_baseMqttClient._cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
}

private async Task SendPackets(CancellationToken cancellationToken)
{
MqttNetTrace.Information(nameof(MqttClientManaged), "Start sending packets.");
MqttApplicationMessage messageInQueue = null;

try
{
while (!cancellationToken.IsCancellationRequested)
{
messageInQueue = _inflightQueue.Take();
await _baseMqttClient.PublishAsync(new List<MqttApplicationMessage>() { messageInQueue });
if (_usePersistance)
await _persistentMessagesManager.Remove(messageInQueue);
}
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
//message not send, equeue it again
if (messageInQueue != null)
_inflightQueue.Add(messageInQueue);
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
await DisconnectAsync().ConfigureAwait(false);
}
finally
{
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
}
}
}
}

+ 12
- 0
MQTTnet.Core/Client/MqttClientManagedOptions.cs Wyświetl plik

@@ -0,0 +1,12 @@

using System;

namespace MQTTnet.Core.Client
{
public class MqttClientManagedOptions: MqttClientTcpOptions
{
public bool UseAutoReconnect { get; set; }
public TimeSpan AutoReconnectDelay { get; set; }
public IMqttClientQueuedStorage Storage { get; set; }
}
}

+ 12
- 12
MQTTnet.Core/Client/MqttClientOptions.cs Wyświetl plik

@@ -1,26 +1,26 @@
using System;
using MQTTnet.Core.Serializer;
using MQTTnet.Core.Serializer;
using System;

namespace MQTTnet.Core.Client
{
public abstract class MqttClientOptions
public interface IMqttClientOptions
{
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();
MqttClientTlsOptions TlsOptions { get; set; }

public MqttApplicationMessage WillMessage { get; set; }
MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }
string UserName { get; set; }

public string Password { get; set; }
string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);
string ClientId { get; set; }

public bool CleanSession { get; set; } = true;
bool CleanSession { get; set; }

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);
TimeSpan KeepAlivePeriod { get; set; }

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
TimeSpan DefaultCommunicationTimeout { get; set; }

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;
MqttProtocolVersion ProtocolVersion { get; set; }
}
}

+ 0
- 166
MQTTnet.Core/Client/MqttClientQueued.cs Wyświetl plik

@@ -1,166 +0,0 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
using MQTTnet.Core.Exceptions;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
using MQTTnet.Core.Internal;

namespace MQTTnet.Core.Client
{
public class MqttClientQueued : MqttClient, IMqttClientQueued
{
private MqttClientQueuedOptions _options;
private int _latestPacketIdentifier;
private readonly ConcurrentQueue<MqttApplicationMessage> _inflightQueue;
private bool _usePersistance = false;
private MqttClientQueuedPersistentMessagesManager _persistentMessagesManager;

public MqttClientQueued(IMqttCommunicationAdapterFactory communicationChannelFactory) : base(communicationChannelFactory)
{
_inflightQueue = new ConcurrentQueue<MqttApplicationMessage>();
}


public async Task ConnectAsync(MqttClientQueuedOptions options)
{
try
{
_options = options;
this._usePersistance = _options.UsePersistence;
await base.ConnectAsync(options);
SetupOutgoingPacketProcessingAsync();

//load persistentMessages
if (_usePersistance)
{
if (_persistentMessagesManager == null)
_persistentMessagesManager = new MqttClientQueuedPersistentMessagesManager(_options);
await _persistentMessagesManager.LoadMessagesAsync();
await InternalPublishAsync(_persistentMessagesManager.GetMessages(), false);
}
}
catch (Exception)
{
await DisconnectAsync().ConfigureAwait(false);
throw;
}
}

public new async Task PublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages)
{
await InternalPublishAsync(applicationMessages, true);
}

private async Task InternalPublishAsync(IEnumerable<MqttApplicationMessage> applicationMessages, bool appendIfUsePersistance)
{
ThrowIfNotConnected();

foreach (var applicationMessage in applicationMessages)
{
if (_usePersistance && appendIfUsePersistance)
await _persistentMessagesManager.SaveMessageAsync(applicationMessage);

_inflightQueue.Enqueue(applicationMessage);
}
}

public new async Task<IList<MqttSubscribeResult>> SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
return await base.SubscribeAsync(topicFilters);
}

private void ThrowIfNotConnected()
{
if (!IsConnected) throw new MqttCommunicationException("The client is not connected.");
}

private ushort GetNewPacketIdentifier()
{
return (ushort)Interlocked.Increment(ref _latestPacketIdentifier);
}

private void SetupOutgoingPacketProcessingAsync()
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Factory.StartNew(
() => SendPackets(base._cancellationTokenSource.Token),
base._cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).ConfigureAwait(false);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}

private async Task SendPackets(CancellationToken cancellationToken)
{
MqttNetTrace.Information(nameof(MqttClient), "Start sending packets.");
MqttApplicationMessage messageToSend = null;
try
{
while (!cancellationToken.IsCancellationRequested)
{
while (_inflightQueue.TryDequeue(out messageToSend))
{
MqttQualityOfServiceLevel qosGroup = messageToSend.QualityOfServiceLevel;
MqttPublishPacket publishPacket = messageToSend.ToPublishPacket();
switch (qosGroup)
{
case MqttQualityOfServiceLevel.AtMostOnce:
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await base._adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, base._cancellationTokenSource.Token, publishPacket);
break;
}

case MqttQualityOfServiceLevel.AtLeastOnce:
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await base.SendAndReceiveAsync<MqttPubAckPacket>(publishPacket);
break;
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
var pubRecPacket = await base.SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await base.SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
break;
}
default:
{
throw new InvalidOperationException();
}
}
//delete from persistence
if (_usePersistance)
await _persistentMessagesManager.Remove(messageToSend);
}
};
}
catch (OperationCanceledException)
{
}
catch (MqttCommunicationException exception)
{
MqttNetTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while sending packets.");
//message not send, equeued again
if (messageToSend != null)
_inflightQueue.Enqueue(messageToSend);
}
catch (Exception exception)
{
MqttNetTrace.Error(nameof(MqttClient), exception, "Unhandled exception while sending packets.");
await DisconnectAsync().ConfigureAwait(false);
}
finally
{
MqttNetTrace.Information(nameof(MqttClient), "Stopped sending packets.");
}
}
}
}

+ 0
- 11
MQTTnet.Core/Client/MqttClientQueuedOptions.cs Wyświetl plik

@@ -1,11 +0,0 @@


namespace MQTTnet.Core.Client
{
public class MqttClientQueuedOptions: MqttClientTcpOptions
{
public bool UsePersistence { get; set; }

public IMqttClientQueuedStorage Storage { get; set; }
}
}

+ 4
- 4
MQTTnet.Core/Client/MqttClientQueuedPersistentMessagesManager.cs Wyświetl plik

@@ -10,9 +10,9 @@ namespace MQTTnet.Core.Client
public class MqttClientQueuedPersistentMessagesManager
{
private readonly IList<MqttApplicationMessage> _persistedMessages = new List<MqttApplicationMessage>();
private readonly MqttClientQueuedOptions _options;
private readonly MqttClientManagedOptions _options;

public MqttClientQueuedPersistentMessagesManager(MqttClientQueuedOptions options)
public MqttClientQueuedPersistentMessagesManager(MqttClientManagedOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
@@ -21,7 +21,7 @@ namespace MQTTnet.Core.Client
{
try
{
var persistentMessages = await _options.Storage.LoadInflightMessagesAsync();
var persistentMessages = await _options.Storage.LoadQueuedMessagesAsync();
lock (_persistedMessages)
{
_persistedMessages.Clear();
@@ -50,7 +50,7 @@ namespace MQTTnet.Core.Client
{
if (_options.Storage != null)
{
await _options.Storage.SaveInflightMessagesAsync(_persistedMessages);
await _options.Storage.SaveQueuedMessagesAsync(_persistedMessages);
}
}
catch (Exception exception)


+ 24
- 2
MQTTnet.Core/Client/MqttClientTcpOptions.cs Wyświetl plik

@@ -1,7 +1,29 @@
namespace MQTTnet.Core.Client
using MQTTnet.Core.Serializer;
using System;

namespace MQTTnet.Core.Client
{
public class MqttClientTcpOptions : MqttClientOptions
public class MqttClientTcpOptions : IMqttClientOptions
{
public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();

public MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }

public string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);

public bool CleanSession { get; set; } = true;

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;


public string Server { get; set; }

public int? Port { get; set; }


+ 24
- 2
MQTTnet.Core/Client/MqttClientWebSocketOptions.cs Wyświetl plik

@@ -1,7 +1,29 @@
namespace MQTTnet.Core.Client
using System;
using MQTTnet.Core.Serializer;

namespace MQTTnet.Core.Client
{
public class MqttClientWebSocketOptions : MqttClientOptions
public class MqttClientWebSocketOptions : IMqttClientOptions
{
public string Uri { get; set; }

public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();

public MqttApplicationMessage WillMessage { get; set; }

public string UserName { get; set; }

public string Password { get; set; }

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);

public bool CleanSession { get; set; } = true;

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);

public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311;

}
}

+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttCommunicationAdapterFactory.cs Wyświetl plik

@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Tests
_adapter = adapter;
}

public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(MqttClientOptions options)
public IMqttCommunicationAdapter CreateMqttCommunicationAdapter(IMqttClientOptions options)
{
return _adapter;
}


+ 5
- 6
Tests/MQTTnet.TestApp.NetCore/Program.cs Wyświetl plik

@@ -57,7 +57,7 @@ namespace MQTTnet.TestApp.NetCore

try
{
var options = new MqttClientQueuedOptions
var options = new MqttClientManagedOptions
{
Server = "192.168.0.14",
ClientId = "XYZ",
@@ -66,11 +66,10 @@ namespace MQTTnet.TestApp.NetCore
Password = "passworda",
KeepAlivePeriod = TimeSpan.FromSeconds(31),
DefaultCommunicationTimeout = TimeSpan.FromSeconds(20),
UsePersistence = true,
Storage = new TestStorage(),
};

var client = new MqttClientFactory().CreateMqttQueuedClient();
var client = new MqttClientFactory().CreateMqttManagedClient();
client.ApplicationMessageReceived += (s, e) =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
@@ -177,7 +176,7 @@ namespace MQTTnet.TestApp.NetCore

await client.SubscribeAsync(new List<TopicFilter>
{
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
new TopicFilter("#", MqttQualityOfServiceLevel.ExactlyOnce)
});

Console.WriteLine("### SUBSCRIBED ###");
@@ -299,7 +298,7 @@ namespace MQTTnet.TestApp.NetCore
string serializationFile = System.IO.Path.Combine(Environment.CurrentDirectory, "messages.bin");
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();

public Task<IList<MqttApplicationMessage>> LoadInflightMessagesAsync()
public Task<IList<MqttApplicationMessage>> LoadQueuedMessagesAsync()
{
//deserialize
// MqttApplicationMessage is not serializable
@@ -319,7 +318,7 @@ namespace MQTTnet.TestApp.NetCore
return Task.FromResult(_messages);
}

public Task SaveInflightMessagesAsync(IList<MqttApplicationMessage> messages)
public Task SaveQueuedMessagesAsync(IList<MqttApplicationMessage> messages)
{
_messages = messages;
//serialize


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs Wyświetl plik

@@ -41,7 +41,7 @@ namespace MQTTnet.TestApp.UniversalWindows

private async void Connect(object sender, RoutedEventArgs e)
{
MqttClientOptions options = null;
IMqttClientOptions options = null;
if (UseTcp.IsChecked == true)
{
options = new MqttClientTcpOptions


Ładowanie…
Anuluj
Zapisz