Browse Source

Merge branch 'develop'

release/3.x.x
Christian 6 years ago
parent
commit
fcf748e980
29 changed files with 291 additions and 138 deletions
  1. +6
    -10
      Build/MQTTnet.nuspec
  2. +4
    -4
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  3. +0
    -17
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapterExtensions.cs
  4. +28
    -3
      Frameworks/MQTTnet.NetStandard/ApplicationMessagePublisherExtensions.cs
  5. +2
    -0
      Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs
  6. +41
    -10
      Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
  7. +8
    -0
      Frameworks/MQTTnet.NetStandard/Client/MqttClientExtensions.cs
  8. +3
    -1
      Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs
  9. +1
    -1
      Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs
  10. +17
    -7
      Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs
  11. +0
    -6
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs
  12. +22
    -14
      Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
  13. +28
    -3
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs
  14. +25
    -0
      Frameworks/MQTTnet.NetStandard/MqttApplicationMessageExtensions.cs
  15. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/IMqttPacketWithIdentifier.cs
  16. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/MqttBasePublishPacket.cs
  17. +0
    -26
      Frameworks/MQTTnet.NetStandard/Packets/MqttPacketExtensions.cs
  18. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/MqttSubAckPacket.cs
  19. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/MqttSubscribePacket.cs
  20. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/MqttUnsubAckPacket.cs
  21. +1
    -1
      Frameworks/MQTTnet.NetStandard/Packets/MqttUnsubscribe.cs
  22. +55
    -10
      Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs
  23. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  24. +20
    -8
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  25. +4
    -4
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  26. +10
    -3
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs
  27. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj
  28. +1
    -1
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml
  29. +8
    -2
      Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs

+ 6
- 10
Build/MQTTnet.nuspec View File

@@ -2,7 +2,7 @@
<package >
<metadata>
<id>MQTTnet</id>
<version>2.7.1</version>
<version>2.7.2</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,15 +10,11 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Fixed wrong parsing of ConnAck packet for protocol version 3.1.0.
* [Core] Log messages are now overriding ToString() and providing a ready to use text representation.
* [Client] Optimized package dispatcher and added several new exceptions.
* [Client] The _ManagedClient_ now has an event which is fired after a queued application message was processed (including exception).
* [Client] The _ManagedClient_ now supports unsubscribing (thanks to @lerppana)
* [Server] Fixed some minor async issues.
* [Server] Fixed wrong comparison of the topic and QoS for retained messages.
* [Server] Added a property which provides access to the used options (read only).
* [Server] Fixed a null ref expection when using an interceptor and publishing via the server directly.
<releaseNotes>* [Client] Added the subprotocol "mqtt" as default for web socket based connections.
* [Client] Added a new client setting called "KeepAliveSendInterval". It allows configuring the effective interval for sending ping requests.
* [Client] The client will no longer send ping requests if other packets are sent within the configured interval.
* [Server] The server now generates a valid packet identifier when disaptching publish packets to clients.
* [Core] Add several new extension methods.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 4
- 4
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -138,7 +138,7 @@ namespace MQTTnet.Adapter
}

var body = header.BodyLength <= ReadBufferSize ? new MemoryStream(header.BodyLength) : new MemoryStream();
var buffer = new byte[ReadBufferSize];
while (body.Length < header.BodyLength)
{
@@ -149,7 +149,7 @@ namespace MQTTnet.Adapter
}

var readBytesCount = await stream.ReadAsync(buffer, 0, bytesLeft, cancellationToken).ConfigureAwait(false);
// Check if the client closed the connection before sending the full body.
if (readBytesCount == 0)
{
@@ -162,7 +162,7 @@ namespace MQTTnet.Adapter
}

body.Seek(0L, SeekOrigin.Begin);
return new ReceivedMqttPacket(header, body);
}

@@ -190,7 +190,7 @@ namespace MQTTnet.Adapter
}
catch (COMException comException)
{
if ((uint) comException.HResult == ErrorOperationAborted)
if ((uint)comException.HResult == ErrorOperationAborted)
{
throw new OperationCanceledException();
}


+ 0
- 17
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapterExtensions.cs View File

@@ -1,17 +0,0 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Packets;

namespace MQTTnet.Adapter
{
public static class MqttChannelAdapterExtensions
{
public static Task SendPacketsAsync(this IMqttChannelAdapter adapter, TimeSpan timeout, CancellationToken cancellationToken, params MqttBasePacket[] packets)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

return adapter.SendPacketsAsync(timeout, cancellationToken, packets);
}
}
}

+ 28
- 3
Frameworks/MQTTnet.NetStandard/ApplicationMessagePublisherExtensions.cs View File

@@ -1,16 +1,41 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Protocol;

namespace MQTTnet
{
public static class ApplicationMessagePublisherExtensions
{
public static Task PublishAsync(this IApplicationMessagePublisher client, params MqttApplicationMessage[] applicationMessages)
public static Task PublishAsync(this IApplicationMessagePublisher publisher, params MqttApplicationMessage[] applicationMessages)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (publisher == null) throw new ArgumentNullException(nameof(publisher));
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

return client.PublishAsync(applicationMessages);
return publisher.PublishAsync(applicationMessages);
}

public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic)
{
if (publisher == null) throw new ArgumentNullException(nameof(publisher));
if (topic == null) throw new ArgumentNullException(nameof(topic));
return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).Build());
}

public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload)
{
if (publisher == null) throw new ArgumentNullException(nameof(publisher));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).Build());
}

public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
{
if (publisher == null) throw new ArgumentNullException(nameof(publisher));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
}
}
}

+ 2
- 0
Frameworks/MQTTnet.NetStandard/Client/IMqttClientOptions.cs View File

@@ -13,6 +13,8 @@ namespace MQTTnet.Client
TimeSpan CommunicationTimeout { get; }
TimeSpan KeepAlivePeriod { get; }
TimeSpan? KeepAliveSendInterval { get; set; }

MqttProtocolVersion ProtocolVersion { get; }

IMqttClientChannelOptions ChannelOptions { get; }


+ 41
- 10
Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs View File

@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -15,6 +16,7 @@ namespace MQTTnet.Client
public class MqttClient : IMqttClient
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly Stopwatch _sendTracker = new Stopwatch();
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly MqttPacketDispatcher _packetDispatcher;
private readonly IMqttNetLogger _logger;
@@ -59,10 +61,12 @@ namespace MQTTnet.Client
_logger.Trace<MqttClient>("Connection with server established.");

await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);

var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false);
_logger.Trace<MqttClient>("MQTT connection with server established.");

_sendTracker.Restart();

if (_options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
@@ -149,7 +153,7 @@ namespace MQTTnet.Client
case MqttQualityOfServiceLevel.AtMostOnce:
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosGroup).ConfigureAwait(false);
await SendAsync(qosGroup.ToArray()).ConfigureAwait(false);
break;
}
case MqttQualityOfServiceLevel.AtLeastOnce:
@@ -167,8 +171,14 @@ namespace MQTTnet.Client
foreach (var publishPacket in qosGroup)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();

var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false);
var pubRelPacket = new MqttPubRelPacket
{
PacketIdentifier = pubRecPacket.PacketIdentifier
};

await SendAndReceiveAsync<MqttPubCompPacket>(pubRelPacket).ConfigureAwait(false);
}

break;
@@ -325,24 +335,31 @@ namespace MQTTnet.Client

private Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
return SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
var response = new MqttPubCompPacket
{
PacketIdentifier = pubRelPacket.PacketIdentifier
};

return SendAsync(response);
}

private Task SendAsync(MqttBasePacket packet)
private Task SendAsync(params MqttBasePacket[] packets)
{
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packet);
_sendTracker.Restart();
return _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, packets);
}

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
ushort identifier = 0;
ushort? identifier = null;
if (requestPacket is IMqttPacketWithIdentifier requestPacketWithIdentifier)
{
identifier = requestPacketWithIdentifier.PacketIdentifier;
}

var packetAwaiter = _packetDispatcher.WaitForPacketAsync(typeof(TResponsePacket), identifier, _options.CommunicationTimeout);
await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);
await SendAsync(requestPacket).ConfigureAwait(false);

return (TResponsePacket)await packetAwaiter.ConfigureAwait(false);
}

@@ -354,8 +371,22 @@ namespace MQTTnet.Client
{
while (!cancellationToken.IsCancellationRequested)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false);
TimeSpan keepAliveSendInterval;
if (_options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
}
else
{
keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
}

if (_sendTracker.Elapsed > keepAliveSendInterval)
{
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false);
}
await Task.Delay(keepAliveSendInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)


+ 8
- 0
Frameworks/MQTTnet.NetStandard/Client/MqttClientExtensions.cs View File

@@ -24,6 +24,14 @@ namespace MQTTnet.Client
return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
}

public static Task<IList<MqttSubscribeResult>> SubscribeAsync(this IMqttClient client, string topic)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return client.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
}

public static Task UnsubscribeAsync(this IMqttClient client, params string[] topicFilters)
{
if (client == null) throw new ArgumentNullException(nameof(client));


+ 3
- 1
Frameworks/MQTTnet.NetStandard/Client/MqttClientOptions.cs View File

@@ -13,7 +13,9 @@ namespace MQTTnet.Client

public IMqttClientCredentials Credentials { get; set; } = new MqttClientCredentials();

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(15);

public TimeSpan? KeepAliveSendInterval { get; set; }

public TimeSpan CommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Client/MqttClientWebSocketOptions.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Client

public IDictionary<string, string> RequestHeaders { get; set; }

public ICollection<string> SubProtocols { get; set; }
public ICollection<string> SubProtocols { get; set; } = new List<string> { "mqtt" };

public CookieContainer CookieContainer { get; set; }



+ 17
- 7
Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs View File

@@ -18,7 +18,7 @@ namespace MQTTnet.Client
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task<MqttBasePacket> WaitForPacketAsync(Type responseType, ushort identifier, TimeSpan timeout)
public async Task<MqttBasePacket> WaitForPacketAsync(Type responseType, ushort? identifier, TimeSpan timeout)
{
var packetAwaiter = AddPacketAwaiter(responseType, identifier);
try
@@ -44,13 +44,13 @@ namespace MQTTnet.Client

if (_awaiters.TryGetValue(type, out var byId))
{
ushort identifier = 0;
ushort? identifier = 0;
if (packet is IMqttPacketWithIdentifier packetWithIdentifier)
{
identifier = packetWithIdentifier.PacketIdentifier;
}

if (byId.TryRemove(identifier, out var tcs))
if (byId.TryRemove(identifier.Value, out var tcs))
{
tcs.TrySetResult(packet);
return;
@@ -65,12 +65,17 @@ namespace MQTTnet.Client
_awaiters.Clear();
}

private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(Type responseType, ushort identifier)
private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(Type responseType, ushort? identifier)
{
var tcs = new TaskCompletionSource<MqttBasePacket>();

if (!identifier.HasValue)
{
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
if (!byId.TryAdd(identifier, tcs))
if (!byId.TryAdd(identifier.Value, tcs))
{
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}.");
}
@@ -78,10 +83,15 @@ namespace MQTTnet.Client
return tcs;
}

private void RemovePacketAwaiter(Type responseType, ushort identifier)
private void RemovePacketAwaiter(Type responseType, ushort? identifier)
{
if (!identifier.HasValue)
{
identifier = 0;
}

var byId = _awaiters.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
byId.TryRemove(identifier, out var _);
byId.TryRemove(identifier.Value, out var _);
}
}
}

+ 0
- 6
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs View File

@@ -69,12 +69,6 @@ namespace MQTTnet.Implementations

public void Dispose()
{
SendStream?.Dispose();
SendStream = null;

ReceiveStream?.Dispose();
ReceiveStream = null;

_socket?.Dispose();
_socket = null;
}


+ 22
- 14
Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs View File

@@ -11,9 +11,9 @@ namespace MQTTnet.Implementations
{
public class WebSocketStream : Stream
{
private readonly WebSocket _webSocket;
private readonly byte[] _chunkBuffer = new byte[MqttWebSocketChannel.BufferSize];
private readonly byte[] _chunckBuffer = new byte[MqttWebSocketChannel.BufferSize];
private readonly Queue<byte> _buffer = new Queue<byte>(MqttWebSocketChannel.BufferSize);
private readonly WebSocket _webSocket;

public WebSocketStream(WebSocket webSocket)
{
@@ -55,25 +55,28 @@ namespace MQTTnet.Implementations
// Use existing date from buffer.
while (count > 0 && _buffer.Any())
{
buffer[offset++] = _buffer.Dequeue();
buffer[offset] = _buffer.Dequeue();
count--;
bytesRead++;
offset++;
}

if (count == 0)
{
return bytesRead;
}

// Fetch new data if the buffer is not full.
while (_webSocket.State == WebSocketState.Open)
{
await FetchChunkAsync(cancellationToken);
await FetchChunkAsync(cancellationToken).ConfigureAwait(false);

while (count > 0 && _buffer.Any())
{
buffer[offset++] = _buffer.Dequeue();
buffer[offset] = _buffer.Dequeue();
count--;
bytesRead++;
offset++;
}

if (count == 0)
@@ -111,19 +114,24 @@ namespace MQTTnet.Implementations
}

private async Task FetchChunkAsync(CancellationToken cancellationToken)
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false);

for (var i = 0; i < response.Count; i++)
{
var @byte = _chunkBuffer[i];
_buffer.Enqueue(@byte);
}
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(_chunckBuffer, 0, _chunckBuffer.Length), cancellationToken).ConfigureAwait(false);

if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}
else if (response.MessageType == WebSocketMessageType.Binary)
{
for (var i = 0; i < response.Count; i++)
{
_buffer.Enqueue(_chunckBuffer[i]);
}
}
else if (response.MessageType == WebSocketMessageType.Text)
{
throw new MqttProtocolViolationException("WebSocket channel received TEXT message.");
}
}
}
}

+ 28
- 3
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs View File

@@ -6,6 +6,7 @@ namespace MQTTnet.ManagedClient
public class ManagedMqttClientOptionsBuilder
{
private readonly ManagedMqttClientOptions _options = new ManagedMqttClientOptions();
private MqttClientOptionsBuilder _clientOptionsBuilder;

public ManagedMqttClientOptionsBuilder WithAutoReconnectDelay(TimeSpan value)
{
@@ -21,7 +22,24 @@ namespace MQTTnet.ManagedClient

public ManagedMqttClientOptionsBuilder WithClientOptions(IMqttClientOptions value)
{
if (_clientOptionsBuilder != null)
{
throw new InvalidOperationException("Cannot use client options builder and client options at the same time.");
}

_options.ClientOptions = value ?? throw new ArgumentNullException(nameof(value));

return this;
}

public ManagedMqttClientOptionsBuilder WithClientOptions(MqttClientOptionsBuilder builder)
{
if (_options.ClientOptions != null)
{
throw new InvalidOperationException("Cannot use client options builder and client options at the same time.");
}

_clientOptionsBuilder = builder;
return this;
}

@@ -29,15 +47,22 @@ namespace MQTTnet.ManagedClient
{
if (options == null) throw new ArgumentNullException(nameof(options));

var builder = new MqttClientOptionsBuilder();
options(builder);
_options.ClientOptions = builder.Build();
if (_clientOptionsBuilder != null)
{
_clientOptionsBuilder = new MqttClientOptionsBuilder();
}

options(_clientOptionsBuilder);
return this;
}

public ManagedMqttClientOptions Build()
{
if (_clientOptionsBuilder != null)
{
_options.ClientOptions = _clientOptionsBuilder.Build();
}

if (_options.ClientOptions == null)
{
throw new InvalidOperationException("The ClientOptions cannot be null.");


+ 25
- 0
Frameworks/MQTTnet.NetStandard/MqttApplicationMessageExtensions.cs View File

@@ -0,0 +1,25 @@
using System;
using System.Text;

namespace MQTTnet
{
public static class MqttApplicationMessageExtensions
{
public static string ConvertPayloadToString(this MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

if (applicationMessage.Payload == null)
{
return null;
}

if (applicationMessage.Payload.Length == 0)
{
return string.Empty;
}

return Encoding.UTF8.GetString(applicationMessage.Payload, 0, applicationMessage.Payload.Length);
}
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/IMqttPacketWithIdentifier.cs View File

@@ -2,6 +2,6 @@
{
public interface IMqttPacketWithIdentifier
{
ushort PacketIdentifier { get; set; }
ushort? PacketIdentifier { get; set; }
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/MqttBasePublishPacket.cs View File

@@ -2,6 +2,6 @@
{
public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public ushort? PacketIdentifier { get; set; }
}
}

+ 0
- 26
Frameworks/MQTTnet.NetStandard/Packets/MqttPacketExtensions.cs View File

@@ -1,26 +0,0 @@
using System;

namespace MQTTnet.Packets
{
public static class MqttPacketExtensions
{
public static TResponsePacket CreateResponse<TResponsePacket>(this MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

var responsePacket = Activator.CreateInstance<TResponsePacket>();

if (responsePacket is IMqttPacketWithIdentifier responsePacketWithIdentifier)
{
if (!(packet is IMqttPacketWithIdentifier requestPacketWithIdentifier))
{
throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
}

responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
}

return responsePacket;
}
}
}

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/MqttSubAckPacket.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Packets
{
public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public ushort? PacketIdentifier { get; set; }

public IList<MqttSubscribeReturnCode> SubscribeReturnCodes { get; } = new List<MqttSubscribeReturnCode>();



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/MqttSubscribePacket.cs View File

@@ -5,7 +5,7 @@ namespace MQTTnet.Packets
{
public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public ushort? PacketIdentifier { get; set; }

public IList<TopicFilter> TopicFilters { get; set; } = new List<TopicFilter>();



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/MqttUnsubAckPacket.cs View File

@@ -2,7 +2,7 @@
{
public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public ushort? PacketIdentifier { get; set; }

public override string ToString()
{


+ 1
- 1
Frameworks/MQTTnet.NetStandard/Packets/MqttUnsubscribe.cs View File

@@ -4,7 +4,7 @@ namespace MQTTnet.Packets
{
public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
public ushort? PacketIdentifier { get; set; }

public IList<string> TopicFilters { get; set; } = new List<string>();



+ 55
- 10
Frameworks/MQTTnet.NetStandard/Serializer/MqttPacketSerializer.cs View File

@@ -397,7 +397,12 @@ namespace MQTTnet.Serializer

private static byte Serialize(MqttPubRelPacket packet, MqttPacketWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubRel packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRel, 0x02);
}
@@ -410,7 +415,12 @@ namespace MQTTnet.Serializer

if (packet.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("Publish packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
}
else
{
@@ -444,21 +454,36 @@ namespace MQTTnet.Serializer

private static byte Serialize(MqttPubAckPacket packet, MqttPacketWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubAck);
}

private static byte Serialize(MqttPubRecPacket packet, MqttPacketWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubRec packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubRec);
}

private static byte Serialize(MqttPubCompPacket packet, MqttPacketWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("PubComp packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.PubComp);
}
@@ -467,7 +492,12 @@ namespace MQTTnet.Serializer
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("Subscribe packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

if (packet.TopicFilters?.Count > 0)
{
@@ -483,7 +513,12 @@ namespace MQTTnet.Serializer

private static byte Serialize(MqttSubAckPacket packet, MqttPacketWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("SubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

if (packet.SubscribeReturnCodes?.Any() == true)
{
@@ -500,7 +535,12 @@ namespace MQTTnet.Serializer
{
if (!packet.TopicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.10.3-2].");

writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("Unsubscribe packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);

if (packet.TopicFilters?.Any() == true)
{
@@ -513,9 +553,14 @@ namespace MQTTnet.Serializer
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
}

private static byte Serialize(IMqttPacketWithIdentifier packet, BinaryWriter writer)
private static byte Serialize(MqttUnsubAckPacket packet, BinaryWriter writer)
{
writer.Write(packet.PacketIdentifier);
if (!packet.PacketIdentifier.HasValue)
{
throw new MqttProtocolViolationException("UnsubAck packet has no packet identifier.");
}

writer.Write(packet.PacketIdentifier.Value);
return MqttPacketWriter.BuildFixedHeader(MqttControlPacketType.UnsubAck);
}



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs View File

@@ -78,7 +78,7 @@ namespace MQTTnet.Server
throw new InvalidOperationException(); // should not happen
}

await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, packet).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { packet }).ConfigureAwait(false);

_logger.Trace<MqttClientPendingMessagesQueue>("Enqueued packet sent (ClientId: {0}).", _clientSession.ClientId);
}


+ 20
- 8
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
@@ -14,6 +15,7 @@ namespace MQTTnet.Server
{
public sealed class MqttClientSession : IDisposable
{
private readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();
private readonly IMqttServerOptions _options;
private readonly IMqttNetLogger _logger;
private readonly MqttRetainedMessagesManager _retainedMessagesManager;
@@ -129,6 +131,11 @@ namespace MQTTnet.Server
var publishPacket = applicationMessage.ToPublishPacket();
publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

PendingMessagesQueue.Enqueue(publishPacket);
}

@@ -205,7 +212,7 @@ namespace MQTTnet.Server

if (packet is MqttPingReqPacket)
{
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttPingRespPacket() });
}

if (packet is MqttPubRelPacket pubRelPacket)
@@ -215,7 +222,12 @@ namespace MQTTnet.Server

if (packet is MqttPubRecPacket pubRecPacket)
{
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse<MqttPubRelPacket>());
var responsePacket = new MqttPubRelPacket
{
PacketIdentifier = pubRecPacket.PacketIdentifier
};

return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { responsePacket });
}

if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
@@ -246,11 +258,11 @@ namespace MQTTnet.Server
private async Task HandleIncomingSubscribePacketAsync(IMqttChannelAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{
var subscribeResult = await SubscriptionsManager.SubscribeAsync(subscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { subscribeResult.ResponsePacket }).ConfigureAwait(false);

if (subscribeResult.CloseConnection)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket()).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttDisconnectPacket() }).ConfigureAwait(false);
await StopAsync().ConfigureAwait(false);
}

@@ -260,7 +272,7 @@ namespace MQTTnet.Server
private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
{
var unsubscribeResult = await SubscriptionsManager.UnsubscribeAsync(unsubscribePacket).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { unsubscribeResult });
}

private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
@@ -302,7 +314,7 @@ namespace MQTTnet.Server
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);

var response = new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response }).ConfigureAwait(false);
}

private async Task HandleIncomingPublishPacketWithQoS2(IMqttChannelAdapter adapter, MqttApplicationMessage applicationMessage, MqttPublishPacket publishPacket, CancellationToken cancellationToken)
@@ -311,13 +323,13 @@ namespace MQTTnet.Server
await ApplicationMessageReceivedCallback(this, applicationMessage).ConfigureAwait(false);

var response = new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier };
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response).ConfigureAwait(false);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response }).ConfigureAwait(false);
}

private Task HandleIncomingPubRelPacketAsync(IMqttChannelAdapter adapter, MqttPubRelPacket pubRelPacket, CancellationToken cancellationToken)
{
var response = new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier };
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, response);
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { response });
}
}
}

+ 4
- 4
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs View File

@@ -53,10 +53,10 @@ namespace MQTTnet.Server
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}).ConfigureAwait(false);
}}).ConfigureAwait(false);

return;
}
@@ -64,11 +64,11 @@ namespace MQTTnet.Server
var result = await GetOrCreateClientSessionAsync(connectPacket).ConfigureAwait(false);
clientSession = result.Session;

await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new[] { new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = result.IsExistingSession
}).ConfigureAwait(false);
}}).ConfigureAwait(false);

ClientConnectedCallback?.Invoke(new ConnectedMqttClient
{


+ 10
- 3
Frameworks/MQTTnet.NetStandard/Server/MqttClientSubscriptionsManager.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet.Server
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly IMqttServerOptions _options;
private readonly string _clientId;
public MqttClientSubscriptionsManager(IMqttServerOptions options, string clientId)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
@@ -30,7 +30,11 @@ namespace MQTTnet.Server

var result = new MqttClientSubscribeResult
{
ResponsePacket = subscribePacket.CreateResponse<MqttSubAckPacket>(),
ResponsePacket = new MqttSubAckPacket
{
PacketIdentifier = subscribePacket.PacketIdentifier
},

CloseConnection = false
};

@@ -87,7 +91,10 @@ namespace MQTTnet.Server
_semaphore.Release();
}

return unsubscribePacket.CreateResponse<MqttUnsubAckPacket>();
return new MqttUnsubAckPacket
{
PacketIdentifier = unsubscribePacket.PacketIdentifier
};
}

public async Task<CheckSubscriptionsResult> CheckSubscriptionsAsync(MqttApplicationMessage applicationMessage)


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MQTTnet.TestApp.UniversalWindows.csproj View File

@@ -12,7 +12,7 @@
<DefaultLanguage>en-US</DefaultLanguage>
<TargetPlatformIdentifier>UAP</TargetPlatformIdentifier>
<TargetPlatformVersion Condition=" '$(TargetPlatformVersion)' == '' ">10.0.16299.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.15063.0</TargetPlatformMinVersion>
<TargetPlatformMinVersion>10.0.16299.0</TargetPlatformMinVersion>
<MinimumVisualStudioVersion>14</MinimumVisualStudioVersion>
<FileAlignment>512</FileAlignment>
<ProjectTypeGuids>{A5A43C5B-DE2A-4C0C-9213-0A381AF9435A};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>


+ 1
- 1
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml View File

@@ -148,7 +148,7 @@
</PivotItem>
<PivotItem Header="Log">
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}">
<Button Click="Clear" Width="120">Clear</Button>
<Button Click="ClearLog" Width="120">Clear</Button>
</StackPanel>
</PivotItem>
</Pivot>


+ 8
- 2
Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs View File

@@ -42,7 +42,8 @@ namespace MQTTnet.TestApp.UniversalWindows
foreach (var traceMessage in _traceMessages)
{
logText.AppendFormat(
"[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}", traceMessage.Timestamp,
"[{0:yyyy-MM-dd HH:mm:ss.fff}] [{1}] [{2}] [{3}] [{4}]{5}",
traceMessage.Timestamp,
traceMessage.Level,
traceMessage.Source,
traceMessage.ThreadId,
@@ -197,8 +198,13 @@ namespace MQTTnet.TestApp.UniversalWindows
}
}

private void Clear(object sender, RoutedEventArgs e)
private void ClearLog(object sender, RoutedEventArgs e)
{
while (_traceMessages.Count > 0)
{
_traceMessages.TryDequeue(out _);
}

Trace.Text = string.Empty;
}



Loading…
Cancel
Save