@@ -2,7 +2,7 @@ | |||
<package > | |||
<metadata> | |||
<id>MQTTnet.AspNetCore</id> | |||
<version>2.7.3</version> | |||
<version>2.7.4</version> | |||
<authors>Christian Kratky</authors> | |||
<owners>Christian Kratky</owners> | |||
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> | |||
@@ -10,13 +10,13 @@ | |||
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | |||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | |||
<description>This is a support library to integrate MQTTnet into AspNetCore.</description> | |||
<releaseNotes>* Updated to MQTTnet 2.7.3. | |||
<releaseNotes>* Updated to MQTTnet 2.7.4. | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
<dependencies> | |||
<group targetFramework="netstandard2.0"> | |||
<dependency id="MQTTnet" version="2.7.3" /> | |||
<dependency id="MQTTnet" version="2.7.4" /> | |||
</group> | |||
</dependencies> | |||
</metadata> | |||
@@ -2,7 +2,7 @@ | |||
<package > | |||
<metadata> | |||
<id>MQTTnet</id> | |||
<version>2.7.3</version> | |||
<version>2.7.4</version> | |||
<authors>Christian Kratky</authors> | |||
<owners>Christian Kratky</owners> | |||
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> | |||
@@ -10,14 +10,13 @@ | |||
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl> | |||
<requireLicenseAcceptance>false</requireLicenseAcceptance> | |||
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> | |||
<releaseNotes>* [Core] Add several new extension methods. | |||
* [Client] Fixed an issue in _ManagedMqttClientOptionsBuilder_ when using _WithClientOptions_ and an options builder. | |||
* [Client] Added the "IsStarted" property for the managed client. | |||
* [Client] Optimized stream buffer for UWP apps. | |||
* [Client] Added the _BufferSize_ to the TCP options. | |||
* [Client] Fixed a race condition which leads to exceptions when reconnecting rapidly. | |||
* [Server] Fixed a race condition which leads to exceptions when clients are reconnecting rapidly. | |||
* [Core] Fixed some issues in stream and socket handling. | |||
<releaseNotes> * [Client] Fixed a deadlock while the client disconnects. | |||
* [Client] Fixed broken support for protocol version 3.1.0. | |||
* [Server] The _MqttTcpServerAdapter_ is now added to the ASP.NET services. | |||
* [Server] _MqttServerAdapter_ is renamed to _MqttTcpServerAdapter_ (BREAKING CHANGE!). | |||
* [Server] The server no longer sends the will message of a client if the disconnect was clean (via _Disconnect_ packet). | |||
* [Server] The application message interceptor now allows closing the connection. | |||
* [Server] Added a new flag for the _ClientDisconnected_ event which contains a value indicating whether the disconnect was clean (via _Disconnect_ packet). | |||
</releaseNotes> | |||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | |||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||
@@ -46,7 +45,7 @@ | |||
<group targetFramework="net461"> | |||
</group> | |||
</dependencies> | |||
</metadata> | |||
@@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Server; | |||
using MQTTnet.Implementations; | |||
namespace MQTTnet.AspNetCore | |||
{ | |||
@@ -20,7 +21,9 @@ namespace MQTTnet.AspNetCore | |||
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>()); | |||
services.AddSingleton<MqttWebSocketServerAdapter>(); | |||
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>()); | |||
services.AddSingleton<MqttTcpServerAdapter>(); | |||
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>()); | |||
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttTcpServerAdapter>()); | |||
return services; | |||
} | |||
@@ -36,7 +36,7 @@ namespace MQTTnet.Adapter | |||
public Task ConnectAsync(TimeSpan timeout) | |||
{ | |||
ThrowIfDisposed(); | |||
_logger.Trace<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout); | |||
_logger.Verbose<MqttChannelAdapter>("Connecting [Timeout={0}]", timeout); | |||
return ExecuteAndWrapExceptionAsync(() => _channel.ConnectAsync().TimeoutAfter(timeout)); | |||
} | |||
@@ -44,7 +44,7 @@ namespace MQTTnet.Adapter | |||
public Task DisconnectAsync(TimeSpan timeout) | |||
{ | |||
ThrowIfDisposed(); | |||
_logger.Trace<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout); | |||
_logger.Verbose<MqttChannelAdapter>("Disconnecting [Timeout={0}]", timeout); | |||
return ExecuteAndWrapExceptionAsync(() => _channel.DisconnectAsync().TimeoutAfter(timeout)); | |||
} | |||
@@ -70,7 +70,7 @@ namespace MQTTnet.Adapter | |||
continue; | |||
} | |||
_logger.Trace<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout); | |||
_logger.Verbose<MqttChannelAdapter>("TX >>> {0} [Timeout={1}]", packet, timeout); | |||
var chunks = PacketSerializer.Serialize(packet); | |||
foreach (var chunk in chunks) | |||
@@ -135,7 +135,7 @@ namespace MQTTnet.Adapter | |||
throw new MqttProtocolViolationException("Received malformed packet."); | |||
} | |||
_logger.Trace<MqttChannelAdapter>("RX <<< {0}", packet); | |||
_logger.Verbose<MqttChannelAdapter>("RX <<< {0}", packet); | |||
} | |||
finally | |||
{ | |||
@@ -5,6 +5,6 @@ namespace MQTTnet.Client | |||
{ | |||
public interface IMqttClientAdapterFactory | |||
{ | |||
IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger); | |||
IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger); | |||
} | |||
} |
@@ -13,7 +13,7 @@ namespace MQTTnet.Client | |||
TimeSpan CommunicationTimeout { get; } | |||
TimeSpan KeepAlivePeriod { get; } | |||
TimeSpan? KeepAliveSendInterval { get; set; } | |||
TimeSpan? KeepAliveSendInterval { get; } | |||
MqttProtocolVersion ProtocolVersion { get; } | |||
@@ -57,16 +57,16 @@ namespace MQTTnet.Client | |||
_packetIdentifierProvider.Reset(); | |||
_packetDispatcher.Reset(); | |||
_adapter = _adapterFactory.CreateClientAdapter(options.ChannelOptions, _logger); | |||
_adapter = _adapterFactory.CreateClientAdapter(options, _logger); | |||
_logger.Trace<MqttClient>("Trying to connect with server."); | |||
_logger.Verbose<MqttClient>("Trying to connect with server."); | |||
await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); | |||
_logger.Trace<MqttClient>("Connection with server established."); | |||
_logger.Verbose<MqttClient>("Connection with server established."); | |||
await StartReceivingPacketsAsync().ConfigureAwait(false); | |||
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); | |||
_logger.Trace<MqttClient>("MQTT connection with server established."); | |||
_logger.Verbose<MqttClient>("MQTT connection with server established."); | |||
_sendTracker.Restart(); | |||
@@ -77,12 +77,14 @@ namespace MQTTnet.Client | |||
IsConnected = true; | |||
Connected?.Invoke(this, new MqttClientConnectedEventArgs(connectResponse.IsSessionPresent)); | |||
_logger.Info<MqttClient>("Connected."); | |||
return new MqttClientConnectResult(connectResponse.IsSessionPresent); | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.Error<MqttClient>(exception, "Error while connecting with server."); | |||
await DisconnectInternalAsync(exception).ConfigureAwait(false); | |||
await DisconnectInternalAsync(null, exception).ConfigureAwait(false); | |||
throw; | |||
} | |||
@@ -104,7 +106,7 @@ namespace MQTTnet.Client | |||
} | |||
finally | |||
{ | |||
await DisconnectInternalAsync(null).ConfigureAwait(false); | |||
await DisconnectInternalAsync(null, null).ConfigureAwait(false); | |||
} | |||
} | |||
@@ -159,7 +161,7 @@ namespace MQTTnet.Client | |||
case MqttQualityOfServiceLevel.AtMostOnce: | |||
{ | |||
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] | |||
await SendAsync((MqttPublishPacket[])qosGroup.ToArray()).ConfigureAwait(false); | |||
await SendAsync(qosGroup.Cast<MqttBasePacket>().ToArray()).ConfigureAwait(false); | |||
break; | |||
} | |||
case MqttQualityOfServiceLevel.AtLeastOnce: | |||
@@ -236,33 +238,48 @@ namespace MQTTnet.Client | |||
if (IsConnected) throw new MqttProtocolViolationException(message); | |||
} | |||
private async Task DisconnectInternalAsync(Exception exception) | |||
private async Task DisconnectInternalAsync(Task sender, Exception exception) | |||
{ | |||
await _disconnectLock.WaitAsync(); | |||
var clientWasConnected = IsConnected; | |||
try | |||
{ | |||
IsConnected = false; | |||
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) | |||
{ | |||
return; | |||
} | |||
_cancellationTokenSource.Cancel(false); | |||
} | |||
catch (Exception adapterException) | |||
{ | |||
_logger.Warning<MqttClient>(adapterException, "Error while disconnecting from adapter."); | |||
} | |||
finally | |||
{ | |||
_disconnectLock.Release(); | |||
} | |||
var clientWasConnected = IsConnected; | |||
IsConnected = false; | |||
if (_packetReceiverTask != null) | |||
try | |||
{ | |||
if (_packetReceiverTask != null && _packetReceiverTask != sender) | |||
{ | |||
Task.WaitAll(_packetReceiverTask); | |||
_packetReceiverTask.Wait(); | |||
} | |||
if (_keepAliveMessageSenderTask != null) | |||
if (_keepAliveMessageSenderTask != null && _keepAliveMessageSenderTask != sender) | |||
{ | |||
Task.WaitAll(_keepAliveMessageSenderTask); | |||
_keepAliveMessageSenderTask.Wait(); | |||
} | |||
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); | |||
_logger.Trace<MqttClient>("Disconnected from adapter."); | |||
if (_adapter != null) | |||
{ | |||
await _adapter.DisconnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); | |||
} | |||
_logger.Verbose<MqttClient>("Disconnected from adapter."); | |||
} | |||
catch (Exception adapterException) | |||
{ | |||
@@ -272,12 +289,9 @@ namespace MQTTnet.Client | |||
{ | |||
_adapter?.Dispose(); | |||
_adapter = null; | |||
_cancellationTokenSource?.Dispose(); | |||
_cancellationTokenSource = null; | |||
_disconnectLock.Release(); | |||
_logger.Info<MqttClient>("Disconnected."); | |||
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); | |||
} | |||
@@ -287,8 +301,6 @@ namespace MQTTnet.Client | |||
{ | |||
try | |||
{ | |||
_logger.Info<MqttClient>("Received <<< {0}", packet); | |||
if (packet is MqttPublishPacket publishPacket) | |||
{ | |||
await ProcessReceivedPublishPacketAsync(publishPacket).ConfigureAwait(false); | |||
@@ -395,7 +407,7 @@ namespace MQTTnet.Client | |||
private async Task SendKeepAliveMessagesAsync() | |||
{ | |||
_logger.Info<MqttClient>("Start sending keep alive packets."); | |||
_logger.Verbose<MqttClient>("Start sending keep alive packets."); | |||
try | |||
{ | |||
@@ -415,37 +427,31 @@ namespace MQTTnet.Client | |||
await Task.Delay(keepAliveSendInterval, _cancellationTokenSource.Token).ConfigureAwait(false); | |||
} | |||
} | |||
catch (OperationCanceledException) | |||
{ | |||
} | |||
catch (Exception exception) | |||
{ | |||
if (_cancellationTokenSource.Token.IsCancellationRequested) | |||
if (exception is OperationCanceledException) | |||
{ | |||
return; | |||
} | |||
if (exception is MqttCommunicationException) | |||
else if (exception is MqttCommunicationException) | |||
{ | |||
_logger.Warning<MqttClient>(exception, "MQTT communication exception while sending/receiving keep alive packets."); | |||
} | |||
else | |||
{ | |||
_logger.Warning<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets."); | |||
_logger.Error<MqttClient>(exception, "Unhandled exception while sending/receiving keep alive packets."); | |||
} | |||
await DisconnectInternalAsync(exception).ConfigureAwait(false); | |||
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
_logger.Info<MqttClient>("Stopped sending keep alive packets."); | |||
_logger.Verbose<MqttClient>("Stopped sending keep alive packets."); | |||
} | |||
} | |||
private async Task ReceivePacketsAsync() | |||
{ | |||
_logger.Info<MqttClient>("Start receiving packets."); | |||
_logger.Verbose<MqttClient>("Start receiving packets."); | |||
try | |||
{ | |||
@@ -463,31 +469,25 @@ namespace MQTTnet.Client | |||
StartProcessReceivedPacket(packet); | |||
} | |||
} | |||
catch (OperationCanceledException) | |||
{ | |||
} | |||
catch (Exception exception) | |||
{ | |||
if (_cancellationTokenSource.IsCancellationRequested) | |||
if (exception is OperationCanceledException) | |||
{ | |||
return; | |||
} | |||
if (exception is MqttCommunicationException) | |||
else if (exception is MqttCommunicationException) | |||
{ | |||
_logger.Warning<MqttClient>(exception, "MQTT communication exception while receiving packets."); | |||
} | |||
else | |||
{ | |||
_logger.Error<MqttClient>(exception, "Unhandled exception while receiving packets."); | |||
} | |||
await DisconnectInternalAsync(exception).ConfigureAwait(false); | |||
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
_logger.Info<MqttClient>("Stopped receiving packets."); | |||
_logger.Verbose<MqttClient>("Stopped receiving packets."); | |||
} | |||
} | |||
@@ -6,7 +6,7 @@ namespace MQTTnet.Diagnostics | |||
{ | |||
event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; | |||
void Trace<TSource>(string message, params object[] parameters); | |||
void Verbose<TSource>(string message, params object[] parameters); | |||
void Info<TSource>(string message, params object[] parameters); | |||
@@ -13,7 +13,7 @@ namespace MQTTnet.Diagnostics | |||
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished; | |||
public void Trace<TSource>(string message, params object[] parameters) | |||
public void Verbose<TSource>(string message, params object[] parameters) | |||
{ | |||
Publish<TSource>(MqttNetLogLevel.Verbose, null, message, parameters); | |||
} | |||
@@ -8,18 +8,28 @@ namespace MQTTnet.Implementations | |||
{ | |||
public class MqttClientAdapterFactory : IMqttClientAdapterFactory | |||
{ | |||
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) | |||
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) | |||
{ | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
switch (options) | |||
var serializer = new MqttPacketSerializer { ProtocolVersion = options.ProtocolVersion }; | |||
switch (options.ChannelOptions) | |||
{ | |||
case MqttClientTcpOptions tcpOptions: | |||
return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), new MqttPacketSerializer(), logger); | |||
{ | |||
return new MqttChannelAdapter(new MqttTcpChannel(tcpOptions), serializer, logger); | |||
} | |||
case MqttClientWebSocketOptions webSocketOptions: | |||
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), new MqttPacketSerializer(), logger); | |||
{ | |||
return new MqttChannelAdapter(new MqttWebSocketChannel(webSocketOptions), serializer, logger); | |||
} | |||
default: | |||
throw new NotSupportedException(); | |||
{ | |||
throw new NotSupportedException(); | |||
} | |||
} | |||
} | |||
} | |||
@@ -9,12 +9,12 @@ using MQTTnet.Server; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public class MqttServerAdapter : IMqttServerAdapter, IDisposable | |||
public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable | |||
{ | |||
private readonly IMqttNetLogger _logger; | |||
private StreamSocketListener _defaultEndpointSocket; | |||
public MqttServerAdapter(IMqttNetLogger logger) | |||
public MqttTcpServerAdapter(IMqttNetLogger logger) | |||
{ | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
} | |||
@@ -67,7 +67,7 @@ namespace MQTTnet.Implementations | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at default endpoint."); | |||
_logger.Error<MqttTcpServerAdapter>(exception, "Error while accepting connection at default endpoint."); | |||
} | |||
} | |||
} |
@@ -14,7 +14,7 @@ using MQTTnet.Server; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public class MqttServerAdapter : IMqttServerAdapter, IDisposable | |||
public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable | |||
{ | |||
private readonly IMqttNetLogger _logger; | |||
@@ -23,7 +23,7 @@ namespace MQTTnet.Implementations | |||
private Socket _tlsEndpointSocket; | |||
private X509Certificate2 _tlsCertificate; | |||
public MqttServerAdapter(IMqttNetLogger logger) | |||
public MqttTcpServerAdapter(IMqttNetLogger logger) | |||
{ | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
} | |||
@@ -117,7 +117,7 @@ namespace MQTTnet.Implementations | |||
return; | |||
} | |||
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at default endpoint."); | |||
_logger.Error<MqttTcpServerAdapter>(exception, "Error while accepting connection at default endpoint."); | |||
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | |||
} | |||
} | |||
@@ -152,7 +152,7 @@ namespace MQTTnet.Implementations | |||
return; | |||
} | |||
_logger.Error<MqttServerAdapter>(exception, "Error while accepting connection at TLS endpoint."); | |||
_logger.Error<MqttTcpServerAdapter>(exception, "Error while accepting connection at TLS endpoint."); | |||
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | |||
} | |||
} |
@@ -9,7 +9,7 @@ using MQTTnet.Client; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public sealed class MqttWebSocketChannel : IMqttChannel, IDisposable | |||
public sealed class MqttWebSocketChannel : IMqttChannel | |||
{ | |||
// ReSharper disable once MemberCanBePrivate.Global | |||
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global | |||
@@ -244,7 +244,7 @@ namespace MQTTnet.ManagedClient | |||
} | |||
finally | |||
{ | |||
_logger.Trace<ManagedMqttClient>("Stopped publishing messages."); | |||
_logger.Verbose<ManagedMqttClient>("Stopped publishing messages."); | |||
} | |||
} | |||
@@ -38,7 +38,14 @@ namespace MQTTnet | |||
public IMqttServer CreateMqttServer() | |||
{ | |||
var logger = new MqttNetLogger(); | |||
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttServerAdapter(logger) }, logger); | |||
return CreateMqttServer(logger); | |||
} | |||
public IMqttServer CreateMqttServer(IMqttNetLogger logger) | |||
{ | |||
if (logger == null) throw new ArgumentNullException(nameof(logger)); | |||
return CreateMqttServer(new List<IMqttServerAdapter> { new MqttTcpServerAdapter(logger) }, logger); | |||
} | |||
public IMqttServer CreateMqttServer(IEnumerable<IMqttServerAdapter> adapters, IMqttNetLogger logger) | |||
@@ -11,5 +11,9 @@ | |||
public string ClientId { get; } | |||
public MqttApplicationMessage ApplicationMessage { get; set; } | |||
public bool AcceptPublish { get; set; } = true; | |||
public bool CloseConnection { get; set; } | |||
} | |||
} |
@@ -4,11 +4,14 @@ namespace MQTTnet.Server | |||
{ | |||
public class MqttClientDisconnectedEventArgs : EventArgs | |||
{ | |||
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client) | |||
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client, bool wasCleanDisconnect) | |||
{ | |||
Client = client ?? throw new ArgumentNullException(nameof(client)); | |||
WasCleanDisconnect = wasCleanDisconnect; | |||
} | |||
public ConnectedMqttClient Client { get; } | |||
public bool WasCleanDisconnect { get; } | |||
} | |||
} |
@@ -81,7 +81,7 @@ namespace MQTTnet.Server | |||
} | |||
finally | |||
{ | |||
_logger.Trace<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId); | |||
_logger.Verbose<MqttClientSession>("Client {0}: Stopped checking keep alive timeout.", _clientId); | |||
} | |||
} | |||
@@ -57,7 +57,7 @@ namespace MQTTnet.Server | |||
_queue.Enqueue(packet); | |||
_queueWaitSemaphore.Release(); | |||
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); | |||
_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet (ClientId: {0}).", _clientSession.ClientId); | |||
} | |||
private async Task SendQueuedPacketsAsync(IMqttChannelAdapter adapter, CancellationToken cancellationToken) | |||
@@ -96,7 +96,7 @@ namespace MQTTnet.Server | |||
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false); | |||
_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); | |||
_logger.Verbose<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId); | |||
} | |||
catch (Exception exception) | |||
{ | |||
@@ -23,6 +23,7 @@ namespace MQTTnet.Server | |||
private IMqttChannelAdapter _adapter; | |||
private CancellationTokenSource _cancellationTokenSource; | |||
private MqttApplicationMessage _willMessage; | |||
private bool _wasCleanDisconnect; | |||
public MqttClientSession( | |||
string clientId, | |||
@@ -55,7 +56,7 @@ namespace MQTTnet.Server | |||
public bool IsConnected => _adapter != null; | |||
public async Task RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||
public async Task<bool> RunAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter adapter) | |||
{ | |||
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); | |||
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); | |||
@@ -64,6 +65,7 @@ namespace MQTTnet.Server | |||
{ | |||
var cancellationTokenSource = new CancellationTokenSource(); | |||
_wasCleanDisconnect = false; | |||
_willMessage = connectPacket.WillMessage; | |||
_adapter = adapter; | |||
_cancellationTokenSource = cancellationTokenSource; | |||
@@ -84,9 +86,11 @@ namespace MQTTnet.Server | |||
{ | |||
_logger.Error<MqttClientSession>(exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId); | |||
} | |||
return _wasCleanDisconnect; | |||
} | |||
public async Task StopAsync() | |||
public async Task StopAsync(bool wasCleanDisconnect = false) | |||
{ | |||
try | |||
{ | |||
@@ -95,6 +99,8 @@ namespace MQTTnet.Server | |||
return; | |||
} | |||
_wasCleanDisconnect = wasCleanDisconnect; | |||
_cancellationTokenSource?.Cancel(false); | |||
PendingMessagesQueue.WaitForCompletion(); | |||
@@ -110,9 +116,10 @@ namespace MQTTnet.Server | |||
finally | |||
{ | |||
var willMessage = _willMessage; | |||
if (willMessage != null) | |||
_willMessage = null; // clear willmessage so it is send just once | |||
if (willMessage != null && !wasCleanDisconnect) | |||
{ | |||
_willMessage = null; // clear willmessage so it is send just once | |||
await ApplicationMessageReceivedCallback(this, willMessage).ConfigureAwait(false); | |||
} | |||
} | |||
@@ -246,7 +253,12 @@ namespace MQTTnet.Server | |||
return HandleIncomingUnsubscribePacketAsync(adapter, unsubscribePacket, cancellationToken); | |||
} | |||
if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) | |||
if (packet is MqttDisconnectPacket) | |||
{ | |||
return StopAsync(true); | |||
} | |||
if (packet is MqttConnectPacket) | |||
{ | |||
return StopAsync(); | |||
} | |||
@@ -262,7 +274,6 @@ namespace MQTTnet.Server | |||
if (subscribeResult.CloseConnection) | |||
{ | |||
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttDisconnectPacket() }).ConfigureAwait(false); | |||
await StopAsync().ConfigureAwait(false); | |||
} | |||
@@ -29,7 +29,7 @@ namespace MQTTnet.Server | |||
} | |||
public Action<ConnectedMqttClient> ClientConnectedCallback { get; set; } | |||
public Action<ConnectedMqttClient> ClientDisconnectedCallback { get; set; } | |||
public Action<ConnectedMqttClient, bool> ClientDisconnectedCallback { get; set; } | |||
public Action<string, TopicFilter> ClientSubscribedTopicCallback { get; set; } | |||
public Action<string, string> ClientUnsubscribedTopicCallback { get; set; } | |||
public Action<string, MqttApplicationMessage> ApplicationMessageReceivedCallback { get; set; } | |||
@@ -37,7 +37,9 @@ namespace MQTTnet.Server | |||
public async Task RunSessionAsync(IMqttChannelAdapter clientAdapter, CancellationToken cancellationToken) | |||
{ | |||
var clientId = string.Empty; | |||
var wasCleanDisconnect = false; | |||
MqttClientSession clientSession = null; | |||
try | |||
{ | |||
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken) | |||
@@ -84,7 +86,7 @@ namespace MQTTnet.Server | |||
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion | |||
}); | |||
await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); | |||
wasCleanDisconnect = await clientSession.RunAsync(connectPacket, clientAdapter).ConfigureAwait(false); | |||
} | |||
catch (OperationCanceledException) | |||
{ | |||
@@ -110,7 +112,8 @@ namespace MQTTnet.Server | |||
ClientId = clientId, | |||
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion, | |||
PendingApplicationMessages = clientSession?.PendingMessagesQueue.Count ?? 0 | |||
}); | |||
}, | |||
wasCleanDisconnect); | |||
} | |||
} | |||
@@ -156,8 +159,13 @@ namespace MQTTnet.Server | |||
{ | |||
try | |||
{ | |||
applicationMessage = InterceptApplicationMessage(senderClientSession, applicationMessage); | |||
if (applicationMessage == null) | |||
var interceptorContext = InterceptApplicationMessage(senderClientSession, applicationMessage); | |||
if (interceptorContext.CloseConnection) | |||
{ | |||
await senderClientSession.StopAsync().ConfigureAwait(false); | |||
} | |||
if (interceptorContext.ApplicationMessage == null || !interceptorContext.AcceptPublish) | |||
{ | |||
return; | |||
} | |||
@@ -230,20 +238,20 @@ namespace MQTTnet.Server | |||
} | |||
} | |||
private MqttApplicationMessage InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) | |||
private MqttApplicationMessageInterceptorContext InterceptApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage) | |||
{ | |||
var interceptor = _options.ApplicationMessageInterceptor; | |||
if (interceptor == null) | |||
{ | |||
return applicationMessage; | |||
} | |||
var interceptorContext = new MqttApplicationMessageInterceptorContext( | |||
senderClientSession?.ClientId, | |||
applicationMessage); | |||
var interceptor = _options.ApplicationMessageInterceptor; | |||
if (interceptor == null) | |||
{ | |||
return interceptorContext; | |||
} | |||
interceptor(interceptorContext); | |||
return interceptorContext.ApplicationMessage; | |||
return interceptorContext; | |||
} | |||
private MqttConnectReturnCode ValidateConnection(MqttConnectPacket connectPacket) | |||
@@ -279,11 +287,11 @@ namespace MQTTnet.Server | |||
clientSession.Dispose(); | |||
clientSession = null; | |||
_logger.Trace<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId); | |||
_logger.Verbose<MqttClientSessionsManager>("Stopped existing session of client '{0}'.", connectPacket.ClientId); | |||
} | |||
else | |||
{ | |||
_logger.Trace<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId); | |||
_logger.Verbose<MqttClientSessionsManager>("Reusing existing session of client '{0}'.", connectPacket.ClientId); | |||
} | |||
} | |||
@@ -302,7 +310,7 @@ namespace MQTTnet.Server | |||
_sessions[connectPacket.ClientId] = clientSession; | |||
_logger.Trace<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId); | |||
_logger.Verbose<MqttClientSessionsManager>("Created a new session for client '{0}'.", connectPacket.ClientId); | |||
} | |||
return new GetOrCreateClientSessionResult { IsExistingSession = isExistingSession, Session = clientSession }; | |||
@@ -132,7 +132,7 @@ namespace MQTTnet.Server | |||
if (!saveIsRequired) | |||
{ | |||
_logger.Trace<MqttRetainedMessagesManager>("Skipped saving retained messages because no changes were detected."); | |||
_logger.Verbose<MqttRetainedMessagesManager>("Skipped saving retained messages because no changes were detected."); | |||
} | |||
if (saveIsRequired && _options.Storage != null) | |||
@@ -138,10 +138,10 @@ namespace MQTTnet.Server | |||
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(client)); | |||
} | |||
private void OnClientDisconnected(ConnectedMqttClient client) | |||
private void OnClientDisconnected(ConnectedMqttClient client, bool wasCleanDisconnect) | |||
{ | |||
_logger.Info<MqttServer>("Client '{0}': Disconnected.", client.ClientId); | |||
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client)); | |||
_logger.Info<MqttServer>("Client '{0}': Disconnected (clean={1}).", client.ClientId, wasCleanDisconnect); | |||
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(client, wasCleanDisconnect)); | |||
} | |||
private void OnClientSubscribedTopic(string clientId, TopicFilter topicFilter) | |||
@@ -162,7 +162,7 @@ namespace MQTTnet.Server | |||
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs) | |||
{ | |||
eventArgs.SessionTask = Task.Run( | |||
async () => await _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token).ConfigureAwait(false), | |||
() => _clientSessionsManager.RunSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), | |||
_cancellationTokenSource.Token); | |||
} | |||
} | |||
@@ -209,12 +209,9 @@ namespace MQTTnet.Core.Tests | |||
await s.StartAsync(new MqttServerOptions()); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.DisconnectAsync(); | |||
await Task.Delay(TimeSpan.FromSeconds(2)); | |||
// TODO: Find another way to wait for the retained components. | |||
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); | |||
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | |||
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build()); | |||
@@ -244,7 +241,7 @@ namespace MQTTnet.Core.Tests | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); | |||
await c1.DisconnectAsync(); | |||
var c2 = await serverAdapter.ConnectTestClient(s, "c2"); | |||
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | |||
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce)); | |||
@@ -274,7 +271,8 @@ namespace MQTTnet.Core.Tests | |||
await s.StartAsync(options); | |||
var c1 = await serverAdapter.ConnectTestClient(s, "c1"); | |||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||
await c1.DisconnectAsync(); | |||
} | |||
finally | |||
@@ -282,8 +280,7 @@ namespace MQTTnet.Core.Tests | |||
await s.StopAsync(); | |||
} | |||
await Task.Delay(TimeSpan.FromSeconds(2)); | |||
// TODO: Find another way to wait for the retained components. | |||
Assert.AreEqual(1, storage.Messages.Count); | |||
s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger()); | |||
@@ -385,17 +382,17 @@ namespace MQTTnet.Core.Tests | |||
private class TestStorage : IMqttServerStorage | |||
{ | |||
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>(); | |||
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>(); | |||
public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages) | |||
{ | |||
_messages = messages; | |||
Messages = messages; | |||
return Task.CompletedTask; | |||
} | |||
public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() | |||
{ | |||
return Task.FromResult(_messages); | |||
return Task.FromResult(Messages); | |||
} | |||
} | |||
@@ -13,7 +13,7 @@ namespace MQTTnet.Core.Tests | |||
_adapter = adapter; | |||
} | |||
public IMqttChannelAdapter CreateClientAdapter(IMqttClientChannelOptions options, IMqttNetLogger logger) | |||
public IMqttChannelAdapter CreateClientAdapter(IMqttClientOptions options, IMqttNetLogger logger) | |||
{ | |||
return _adapter; | |||
} | |||
@@ -0,0 +1,38 @@ | |||
using MQTTnet.Client; | |||
using MQTTnet.Server; | |||
using System; | |||
using System.Threading.Tasks; | |||
namespace MQTTnet.Core.Tests | |||
{ | |||
public static class TestServerExtensions | |||
{ | |||
/// <summary> | |||
/// publishes a message with a client and waits in the server until a message with the same topic is received | |||
/// </summary> | |||
/// <returns></returns> | |||
public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message) | |||
{ | |||
var tcs = new TaskCompletionSource<object>(); | |||
EventHandler<MqttApplicationMessageReceivedEventArgs> handler = (sender, args) => | |||
{ | |||
if (args.ApplicationMessage.Topic == message.Topic) | |||
{ | |||
tcs.SetResult(true); | |||
} | |||
}; | |||
server.ApplicationMessageReceived += handler; | |||
try | |||
{ | |||
await client.PublishAsync(message).ConfigureAwait(false); | |||
await tcs.Task.ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
server.ApplicationMessageReceived -= handler; | |||
} | |||
} | |||
} | |||
} |
@@ -3,7 +3,6 @@ using System.Text; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Protocol; | |||
using MQTTnet.Server; | |||
using Newtonsoft.Json.Linq; | |||
namespace MQTTnet.TestApp.NetCore | |||
{ | |||
@@ -38,6 +37,12 @@ namespace MQTTnet.TestApp.NetCore | |||
// based payload with the timestamp is a suitable use case. | |||
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(DateTime.Now.ToString("O")); | |||
} | |||
if (context.ApplicationMessage.Topic == "not_allowed_topic") | |||
{ | |||
context.AcceptPublish = false; | |||
context.CloseConnection = true; | |||
} | |||
}, | |||
SubscriptionInterceptor = context => | |||
{ | |||
@@ -72,27 +77,27 @@ namespace MQTTnet.TestApp.NetCore | |||
ConsoleColor.Magenta); | |||
}; | |||
options.ApplicationMessageInterceptor = c => | |||
{ | |||
if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0) | |||
{ | |||
return; | |||
} | |||
try | |||
{ | |||
var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); | |||
var timestampProperty = content.Property("timestamp"); | |||
if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) | |||
{ | |||
timestampProperty.Value = DateTime.Now.ToString("O"); | |||
c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); | |||
} | |||
} | |||
catch (Exception) | |||
{ | |||
} | |||
}; | |||
//options.ApplicationMessageInterceptor = c => | |||
//{ | |||
// if (c.ApplicationMessage.Payload == null || c.ApplicationMessage.Payload.Length == 0) | |||
// { | |||
// return; | |||
// } | |||
// try | |||
// { | |||
// var content = JObject.Parse(Encoding.UTF8.GetString(c.ApplicationMessage.Payload)); | |||
// var timestampProperty = content.Property("timestamp"); | |||
// if (timestampProperty != null && timestampProperty.Value.Type == JTokenType.Null) | |||
// { | |||
// timestampProperty.Value = DateTime.Now.ToString("O"); | |||
// c.ApplicationMessage.Payload = Encoding.UTF8.GetBytes(content.ToString()); | |||
// } | |||
// } | |||
// catch (Exception) | |||
// { | |||
// } | |||
//}; | |||
mqttServer.ClientDisconnected += (s, e) => | |||
{ | |||
@@ -13,6 +13,8 @@ using MQTTnet.Implementations; | |||
using MQTTnet.ManagedClient; | |||
using MQTTnet.Protocol; | |||
using MQTTnet.Server; | |||
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs; | |||
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs; | |||
namespace MQTTnet.TestApp.UniversalWindows | |||
{ | |||
@@ -33,6 +35,11 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
private async void OnTraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e) | |||
{ | |||
_traceMessages.Enqueue(e.TraceMessage); | |||
await UpdateLogAsync(); | |||
} | |||
private async Task UpdateLogAsync() | |||
{ | |||
while (_traceMessages.Count > 100) | |||
{ | |||
_traceMessages.TryDequeue(out _); | |||
@@ -113,11 +120,15 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
{ | |||
await _mqttClient.DisconnectAsync(); | |||
_mqttClient.ApplicationMessageReceived -= OnApplicationMessageReceived; | |||
_mqttClient.Connected -= OnConnected; | |||
_mqttClient.Disconnected -= OnDisconnected; | |||
} | |||
var factory = new MqttFactory(); | |||
_mqttClient = factory.CreateMqttClient(); | |||
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived; | |||
_mqttClient.Connected += OnConnected; | |||
_mqttClient.Disconnected += OnDisconnected; | |||
await _mqttClient.ConnectAsync(options); | |||
} | |||
@@ -127,6 +138,22 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
} | |||
} | |||
private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs e) | |||
{ | |||
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, | |||
"", MqttNetLogLevel.Info, "! DISCONNECTED EVENT FIRED", null)); | |||
Task.Run(UpdateLogAsync); | |||
} | |||
private void OnConnected(object sender, MqttClientConnectedEventArgs e) | |||
{ | |||
_traceMessages.Enqueue(new MqttNetLogMessage("", DateTime.Now, -1, | |||
"", MqttNetLogLevel.Info, "! CONNECTED EVENT FIRED", null)); | |||
Task.Run(UpdateLogAsync); | |||
} | |||
private async void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs) | |||
{ | |||
var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; | |||