@@ -13,11 +13,9 @@ | |||||
<releaseNotes>* Updated to MQTTnet 2.8.0. | <releaseNotes>* Updated to MQTTnet 2.8.0. | ||||
</releaseNotes> | </releaseNotes> | ||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | |||||
<dependencies> | <dependencies> | ||||
<group targetFramework="netstandard2.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</dependencies> | </dependencies> | ||||
</metadata> | </metadata> | ||||
@@ -15,24 +15,7 @@ | |||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
<dependencies> | <dependencies> | ||||
<group targetFramework="netstandard2.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="netstandard1.3"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="netstandard2.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="uap10.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="net452"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="net461"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<dependency id="MQTTnet" version="2.8.0-beta1" /> | |||||
</dependencies> | </dependencies> | ||||
</metadata> | </metadata> | ||||
@@ -15,24 +15,7 @@ | |||||
<copyright>Copyright Christian Kratky 2016-2018</copyright> | <copyright>Copyright Christian Kratky 2016-2018</copyright> | ||||
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | <tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags> | ||||
<dependencies> | <dependencies> | ||||
<group targetFramework="netstandard2.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="netstandard1.3"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="netstandard2.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="uap10.0"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="net452"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<group targetFramework="net461"> | |||||
<dependency id="MQTTnet" version="2.8.0-alpha5" /> | |||||
</group> | |||||
<dependency id="MQTTnet" version="2.8.0-beta1" /> | |||||
</dependencies> | </dependencies> | ||||
</metadata> | </metadata> | ||||
@@ -48,7 +48,7 @@ | |||||
<dependency id="System.Net.WebSockets.Client" version="4.3.2" /> | <dependency id="System.Net.WebSockets.Client" version="4.3.2" /> | ||||
</group> | </group> | ||||
<group targetFramework="uap10.0"> | <group targetFramework="uap10.0"> | ||||
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="5.4.1" /> | |||||
<dependency id="Microsoft.NETCore.UniversalWindowsPlatform" version="6.1.4" /> | |||||
</group> | </group> | ||||
<group targetFramework="net452"> | <group targetFramework="net452"> | ||||
</group> | </group> | ||||
@@ -36,8 +36,8 @@ if ($path) { | |||||
Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue | Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue | ||||
New-Item -ItemType Directory -Force -Path .\NuGet | New-Item -ItemType Directory -Force -Path .\NuGet | ||||
.\NuGet.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\NuGet.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\NuGet.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\NuGet.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\nuget.exe pack MQTTnet.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\nuget.exe pack MQTTnet.AspNetCore.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\nuget.exe pack MQTTnet.Extensions.Rpc.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
.\nuget.exe pack MQTTnet.Extensions.ManagedClient.nuspec -Verbosity detailed -Symbols -OutputDir "NuGet" -Version $nugetVersion | |||||
} | } |
@@ -0,0 +1,9 @@ | |||||
param([string]$apiKey) | |||||
$files = Get-ChildItem -Path ".\NuGet" -Filter "*.nupkg" | |||||
foreach ($file in $files) | |||||
{ | |||||
Write-Host "Uploading: " $file | |||||
.\nuget.exe push $file.Fullname $apiKey -NoSymbols -Source https://api.nuget.org/v3/index.json | |||||
} |
@@ -20,6 +20,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{67C28AC1 | |||||
Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec | Build\MQTTnet.Extensions.ManagedClient.nuspec = Build\MQTTnet.Extensions.ManagedClient.nuspec | ||||
Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec | Build\MQTTnet.Extensions.Rpc.nuspec = Build\MQTTnet.Extensions.Rpc.nuspec | ||||
Build\MQTTnet.nuspec = Build\MQTTnet.nuspec | Build\MQTTnet.nuspec = Build\MQTTnet.nuspec | ||||
Build\upload.ps1 = Build\upload.ps1 | |||||
EndProjectSection | EndProjectSection | ||||
EndProject | EndProject | ||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" | Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{B3F60ECB-45BA-4C66-8903-8BB89CA67998}" | ||||
@@ -74,6 +75,7 @@ Global | |||||
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU | {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.Build.0 = Debug|Any CPU | {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|Any CPU.Deploy.0 = Debug|Any CPU | |||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.ActiveCfg = Debug|ARM | {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.ActiveCfg = Debug|ARM | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Build.0 = Debug|ARM | {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Build.0 = Debug|ARM | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Deploy.0 = Debug|ARM | {FF1F72D6-9524-4422-9497-3CC0002216ED}.Debug|ARM.Deploy.0 = Debug|ARM | ||||
@@ -7,7 +7,6 @@ using System.Threading.Tasks; | |||||
using MQTTnet.Client; | using MQTTnet.Client; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Exceptions; | using MQTTnet.Exceptions; | ||||
using MQTTnet.Internal; | |||||
using MQTTnet.Protocol; | using MQTTnet.Protocol; | ||||
namespace MQTTnet.Extensions.ManagedClient | namespace MQTTnet.Extensions.ManagedClient | ||||
@@ -15,8 +14,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
public class ManagedMqttClient : IManagedMqttClient | public class ManagedMqttClient : IManagedMqttClient | ||||
{ | { | ||||
private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>(); | private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>(); | ||||
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>(); | |||||
private readonly AsyncLock _subscriptionsLock = new AsyncLock(); | |||||
private readonly ConcurrentDictionary<string, MqttQualityOfServiceLevel> _subscriptions = new ConcurrentDictionary<string, MqttQualityOfServiceLevel>(); | |||||
private readonly List<string> _unsubscriptions = new List<string>(); | private readonly List<string> _unsubscriptions = new List<string>(); | ||||
private readonly IMqttClient _mqttClient; | private readonly IMqttClient _mqttClient; | ||||
@@ -118,39 +116,36 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
_messageQueue.Add(applicationMessage); | _messageQueue.Add(applicationMessage); | ||||
} | } | ||||
public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) | |||||
public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters) | |||||
{ | { | ||||
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); | ||||
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
foreach (var topicFilter in topicFilters) | |||||
{ | { | ||||
foreach (var topicFilter in topicFilters) | |||||
{ | |||||
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; | |||||
_subscriptionsNotPushed = true; | |||||
} | |||||
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel; | |||||
_subscriptionsNotPushed = true; | |||||
} | } | ||||
return Task.FromResult(0); | |||||
} | } | ||||
public async Task UnsubscribeAsync(IEnumerable<string> topics) | |||||
public Task UnsubscribeAsync(IEnumerable<string> topics) | |||||
{ | { | ||||
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
foreach (var topic in topics) | |||||
{ | { | ||||
foreach (var topic in topics) | |||||
if (_subscriptions.TryRemove(topic, out _)) | |||||
{ | { | ||||
if (_subscriptions.Remove(topic)) | |||||
{ | |||||
_unsubscriptions.Add(topic); | |||||
_subscriptionsNotPushed = true; | |||||
} | |||||
_unsubscriptions.Add(topic); | |||||
_subscriptionsNotPushed = true; | |||||
} | } | ||||
} | } | ||||
return Task.FromResult(0); | |||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
_messageQueue?.Dispose(); | _messageQueue?.Dispose(); | ||||
_subscriptionsLock?.Dispose(); | |||||
_connectionCancellationToken?.Dispose(); | _connectionCancellationToken?.Dispose(); | ||||
_publishingCancellationToken?.Dispose(); | _publishingCancellationToken?.Dispose(); | ||||
} | } | ||||
@@ -289,7 +284,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
List<TopicFilter> subscriptions; | List<TopicFilter> subscriptions; | ||||
List<string> unsubscriptions; | List<string> unsubscriptions; | ||||
using (await _subscriptionsLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) | |||||
lock (_subscriptions) | |||||
{ | { | ||||
subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); | subscriptions = _subscriptions.Select(i => new TopicFilter(i.Key, i.Value)).ToList(); | ||||
@@ -18,6 +18,8 @@ namespace MQTTnet.Adapter | |||||
private const uint ErrorOperationAborted = 0x800703E3; | private const uint ErrorOperationAborted = 0x800703E3; | ||||
private const int ReadBufferSize = 4096; // TODO: Move buffer size to config | private const int ReadBufferSize = 4096; // TODO: Move buffer size to config | ||||
private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1); | |||||
private readonly IMqttNetChildLogger _logger; | private readonly IMqttNetChildLogger _logger; | ||||
private readonly IMqttChannel _channel; | private readonly IMqttChannel _channel; | ||||
@@ -88,6 +90,7 @@ namespace MQTTnet.Adapter | |||||
public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) | public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) | ||||
{ | { | ||||
await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); | |||||
try | try | ||||
{ | { | ||||
_logger.Verbose("TX >>> {0}", packet); | _logger.Verbose("TX >>> {0}", packet); | ||||
@@ -107,6 +110,10 @@ namespace MQTTnet.Adapter | |||||
WrapException(exception); | WrapException(exception); | ||||
} | } | ||||
finally | |||||
{ | |||||
_writerSemaphore.Release(); | |||||
} | |||||
} | } | ||||
public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) | public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) | ||||
@@ -9,7 +9,7 @@ namespace MQTTnet.Diagnostics | |||||
public MqttNetChildLogger(IMqttNetLogger logger, string source) | public MqttNetChildLogger(IMqttNetLogger logger, string source) | ||||
{ | { | ||||
_logger = logger; | |||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||||
_source = source; | _source = source; | ||||
} | } | ||||
@@ -40,7 +40,18 @@ namespace MQTTnet.Implementations | |||||
public static Func<MqttClientTcpOptions, IEnumerable<ChainValidationResult>> CustomIgnorableServerCertificateErrorsResolver { get; set; } | public static Func<MqttClientTcpOptions, IEnumerable<ChainValidationResult>> CustomIgnorableServerCertificateErrorsResolver { get; set; } | ||||
public string Endpoint => _socket?.Information?.RemoteAddress?.ToString(); // TODO: Check if contains also the port. | |||||
public string Endpoint | |||||
{ | |||||
get | |||||
{ | |||||
if (_socket?.Information != null) | |||||
{ | |||||
return _socket.Information.RemoteAddress + ":" + _socket.Information.RemotePort; | |||||
} | |||||
return null; | |||||
} | |||||
} | |||||
public async Task ConnectAsync(CancellationToken cancellationToken) | public async Task ConnectAsync(CancellationToken cancellationToken) | ||||
{ | { | ||||
@@ -81,10 +92,13 @@ namespace MQTTnet.Implementations | |||||
return _readStream.ReadAsync(buffer, offset, count, cancellationToken); | return _readStream.ReadAsync(buffer, offset, count, cancellationToken); | ||||
} | } | ||||
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||||
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||||
{ | { | ||||
await _writeStream.WriteAsync(buffer, offset, count, cancellationToken); | |||||
await _writeStream.FlushAsync(cancellationToken); | |||||
// In the write method only the internal buffer will be filled. So here is no | |||||
// async/await required. The real network transmit is done when calling the | |||||
// Flush method. | |||||
_writeStream.Write(buffer, offset, count); | |||||
return _writeStream.FlushAsync(cancellationToken); | |||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
@@ -1,7 +1,7 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">netstandard1.3;netstandard2.0;net452;uap10.0</TargetFrameworks> | |||||
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">netstandard1.3;netstandard2.0;net452;net461;uap10.0</TargetFrameworks> | |||||
<TargetFrameworks Condition=" '$(OS)' != 'Windows_NT' ">netstandard1.3;netstandard2.0</TargetFrameworks> | <TargetFrameworks Condition=" '$(OS)' != 'Windows_NT' ">netstandard1.3;netstandard2.0</TargetFrameworks> | ||||
<AssemblyName>MQTTnet</AssemblyName> | <AssemblyName>MQTTnet</AssemblyName> | ||||
<RootNamespace>MQTTnet</RootNamespace> | <RootNamespace>MQTTnet</RootNamespace> | ||||
@@ -62,4 +62,7 @@ | |||||
<ItemGroup Condition="'$(TargetFramework)'=='net452'"> | <ItemGroup Condition="'$(TargetFramework)'=='net452'"> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup Condition="'$(TargetFramework)'=='net461'"> | |||||
</ItemGroup> | |||||
</Project> | </Project> |
@@ -11,8 +11,7 @@ namespace MQTTnet.Serializer | |||||
{ | { | ||||
private const int FixedHeaderSize = 1; | private const int FixedHeaderSize = 1; | ||||
[ThreadStatic] | |||||
private static MqttPacketWriter _packetWriter; | |||||
private readonly MqttPacketWriter _packetWriter = new MqttPacketWriter(); | |||||
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | ||||
@@ -20,14 +19,12 @@ namespace MQTTnet.Serializer | |||||
{ | { | ||||
if (packet == null) throw new ArgumentNullException(nameof(packet)); | if (packet == null) throw new ArgumentNullException(nameof(packet)); | ||||
var packetWriter = InitializePacketWriter(); | |||||
// Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) | // Leave enough head space for max header size (fixed + 4 variable remaining length = 5 bytes) | ||||
packetWriter.Reset(); | |||||
packetWriter.Seek(5); | |||||
_packetWriter.Reset(); | |||||
_packetWriter.Seek(5); | |||||
var fixedHeader = SerializePacket(packet, packetWriter); | |||||
var remainingLength = packetWriter.Length - 5; | |||||
var fixedHeader = SerializePacket(packet, _packetWriter); | |||||
var remainingLength = _packetWriter.Length - 5; | |||||
var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength); | var remainingLengthBuffer = MqttPacketWriter.EncodeRemainingLength(remainingLength); | ||||
@@ -35,12 +32,12 @@ namespace MQTTnet.Serializer | |||||
var headerOffset = 5 - headerSize; | var headerOffset = 5 - headerSize; | ||||
// Position cursor on correct offset on beginining of array (has leading 0x0) | // Position cursor on correct offset on beginining of array (has leading 0x0) | ||||
packetWriter.Seek(headerOffset); | |||||
packetWriter.Write(fixedHeader); | |||||
packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); | |||||
_packetWriter.Seek(headerOffset); | |||||
_packetWriter.Write(fixedHeader); | |||||
_packetWriter.Write(remainingLengthBuffer.Array, remainingLengthBuffer.Offset, remainingLengthBuffer.Count); | |||||
var buffer = packetWriter.GetBuffer(); | |||||
return new ArraySegment<byte>(buffer, headerOffset, packetWriter.Length - headerOffset); | |||||
var buffer = _packetWriter.GetBuffer(); | |||||
return new ArraySegment<byte>(buffer, headerOffset, _packetWriter.Length - headerOffset); | |||||
} | } | ||||
public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket) | public MqttBasePacket Deserialize(ReceivedMqttPacket receivedMqttPacket) | ||||
@@ -76,7 +73,7 @@ namespace MQTTnet.Serializer | |||||
public void FreeBuffer() | public void FreeBuffer() | ||||
{ | { | ||||
InitializePacketWriter().FreeBuffer(); | |||||
_packetWriter.FreeBuffer(); | |||||
} | } | ||||
private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter packetWriter) | private byte SerializePacket(MqttBasePacket packet, MqttPacketWriter packetWriter) | ||||
@@ -101,16 +98,6 @@ namespace MQTTnet.Serializer | |||||
} | } | ||||
} | } | ||||
private static MqttPacketWriter InitializePacketWriter() | |||||
{ | |||||
if (_packetWriter == null) | |||||
{ | |||||
_packetWriter = new MqttPacketWriter(); | |||||
} | |||||
return _packetWriter; | |||||
} | |||||
private static MqttBasePacket DeserializeUnsubAck(MqttPacketBodyReader body) | private static MqttBasePacket DeserializeUnsubAck(MqttPacketBodyReader body) | ||||
{ | { | ||||
ThrowIfBodyIsEmpty(body); | ThrowIfBodyIsEmpty(body); | ||||
@@ -12,6 +12,8 @@ namespace MQTTnet.Serializer | |||||
/// </summary> | /// </summary> | ||||
public class MqttPacketWriter | public class MqttPacketWriter | ||||
{ | { | ||||
public static int MaxBufferSize = 4096; | |||||
private byte[] _buffer = new byte[128]; | private byte[] _buffer = new byte[128]; | ||||
private int _position; | private int _position; | ||||
@@ -117,12 +119,12 @@ namespace MQTTnet.Serializer | |||||
// a lot and the size will never reduced. So this method tries to find a size which can be held in | // a lot and the size will never reduced. So this method tries to find a size which can be held in | ||||
// memory for a long time without causing troubles. | // memory for a long time without causing troubles. | ||||
if (_buffer.Length < 4096) | |||||
if (_buffer.Length < MaxBufferSize) | |||||
{ | { | ||||
return; | return; | ||||
} | } | ||||
Array.Resize(ref _buffer, 4096); | |||||
Array.Resize(ref _buffer, MaxBufferSize); | |||||
} | } | ||||
private void EnsureAdditionalCapacity(int additionalCapacity) | private void EnsureAdditionalCapacity(int additionalCapacity) | ||||
@@ -148,7 +148,7 @@ namespace MQTTnet.Server | |||||
clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage); | clientSession.EnqueueApplicationMessage(enqueuedApplicationMessage.Sender, applicationMessage); | ||||
} | } | ||||
} | } | ||||
catch (TaskCanceledException) | |||||
catch (OperationCanceledException) | |||||
{ | { | ||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
@@ -4,6 +4,8 @@ | |||||
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" | xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" | ||||
xmlns:d="http://schemas.microsoft.com/expression/blend/2008" | xmlns:d="http://schemas.microsoft.com/expression/blend/2008" | ||||
xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" | xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" | ||||
xmlns:server="using:MQTTnet.Server" | |||||
xmlns:interop="using:Windows.UI.Xaml.Interop" | |||||
d:DesignHeight="800" | d:DesignHeight="800" | ||||
d:DesignWidth="800" | d:DesignWidth="800" | ||||
mc:Ignorable="d"> | mc:Ignorable="d"> | ||||
@@ -31,7 +33,7 @@ | |||||
<CheckBox x:Name="CleanSession" IsChecked="True"></CheckBox> | <CheckBox x:Name="CleanSession" IsChecked="True"></CheckBox> | ||||
<TextBlock>Keep alive interval:</TextBlock> | <TextBlock>Keep alive interval:</TextBlock> | ||||
<TextBox x:Name="KeepAliveInterval" Text="5"></TextBox> | <TextBox x:Name="KeepAliveInterval" Text="5"></TextBox> | ||||
<StackPanel Orientation="Horizontal"> | <StackPanel Orientation="Horizontal"> | ||||
<RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton> | <RadioButton x:Name="UseTcp" IsChecked="True" GroupName="connection">TCP</RadioButton> | ||||
<RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton> | <RadioButton x:Name="UseWs" GroupName="connection">WS</RadioButton> | ||||
@@ -142,6 +144,7 @@ | |||||
<CheckBox x:Name="ServerPersistRetainedMessages" IsChecked="True">Persist retained messages in JSON format</CheckBox> | <CheckBox x:Name="ServerPersistRetainedMessages" IsChecked="True">Persist retained messages in JSON format</CheckBox> | ||||
<CheckBox x:Name="ServerClearRetainedMessages">Clear previously retained messages on startup</CheckBox> | <CheckBox x:Name="ServerClearRetainedMessages">Clear previously retained messages on startup</CheckBox> | ||||
<CheckBox x:Name="ServerAllowPersistentSessions">Allow persistent sessions</CheckBox> | |||||
<StackPanel Orientation="Horizontal"> | <StackPanel Orientation="Horizontal"> | ||||
<Button Width="120" Margin="0,0,10,0" Click="StartServer">Start</Button> | <Button Width="120" Margin="0,0,10,0" Click="StartServer">Start</Button> | ||||
@@ -149,11 +152,6 @@ | |||||
</StackPanel> | </StackPanel> | ||||
</StackPanel> | </StackPanel> | ||||
</PivotItem> | </PivotItem> | ||||
<PivotItem Header="Log"> | |||||
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}"> | |||||
<Button Click="ClearLog" Width="120">Clear</Button> | |||||
</StackPanel> | |||||
</PivotItem> | |||||
<PivotItem Header="Sessions"> | <PivotItem Header="Sessions"> | ||||
<StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}"> | <StackPanel Background="{ThemeResource ApplicationPageBackgroundThemeBrush}"> | ||||
@@ -162,12 +160,23 @@ | |||||
<Button Width="120" Margin="0,0,10,0" Click="ClearSessions">Clear</Button> | <Button Width="120" Margin="0,0,10,0" Click="ClearSessions">Clear</Button> | ||||
</StackPanel> | </StackPanel> | ||||
<ListView> | |||||
<ListView ItemsSource="{Binding}" x:Name="ListViewSessions"> | |||||
<ListView.ItemTemplate> | <ListView.ItemTemplate> | ||||
<DataTemplate> | |||||
<Grid> | |||||
<TextBlock></TextBlock> | |||||
</Grid> | |||||
<DataTemplate x:DataType="server:IMqttClientSessionStatus"> | |||||
<StackPanel Orientation="Horizontal"> | |||||
<StackPanel.Resources> | |||||
<Style TargetType="TextBlock"> | |||||
<Setter Property="Margin" Value="0,0,10,0"></Setter> | |||||
</Style> | |||||
</StackPanel.Resources> | |||||
<TextBlock Text="{Binding ClientId}"></TextBlock> | |||||
<TextBlock Text="{Binding Endpoint}"></TextBlock> | |||||
<TextBlock Text="{Binding IsConnected}"></TextBlock> | |||||
<TextBlock Text="{Binding LastPacketReceived}"></TextBlock> | |||||
<TextBlock Text="{Binding LastNonKeepAlivePacketReceived}"></TextBlock> | |||||
<TextBlock Text="{Binding ProtocolVersion}"></TextBlock> | |||||
<TextBlock Text="{Binding PendingApplicationMessagesCount}"></TextBlock> | |||||
</StackPanel> | |||||
</DataTemplate> | </DataTemplate> | ||||
</ListView.ItemTemplate> | </ListView.ItemTemplate> | ||||
</ListView> | </ListView> | ||||
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using System.Collections.Concurrent; | using System.Collections.Concurrent; | ||||
using System.Collections.ObjectModel; | |||||
using System.Text; | using System.Text; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using Windows.Security.Cryptography.Certificates; | using Windows.Security.Cryptography.Certificates; | ||||
@@ -21,6 +22,7 @@ namespace MQTTnet.TestApp.UniversalWindows | |||||
public sealed partial class MainPage | public sealed partial class MainPage | ||||
{ | { | ||||
private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>(); | private readonly ConcurrentQueue<MqttNetLogMessage> _traceMessages = new ConcurrentQueue<MqttNetLogMessage>(); | ||||
private readonly ObservableCollection<IMqttClientSessionStatus> _sessions = new ObservableCollection<IMqttClientSessionStatus>(); | |||||
private IMqttClient _mqttClient; | private IMqttClient _mqttClient; | ||||
private IMqttServer _mqttServer; | private IMqttServer _mqttServer; | ||||
@@ -306,6 +308,7 @@ namespace MQTTnet.TestApp.UniversalWindows | |||||
var options = new MqttServerOptions(); | var options = new MqttServerOptions(); | ||||
options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); | options.DefaultEndpointOptions.Port = int.Parse(ServerPort.Text); | ||||
options.Storage = storage; | options.Storage = storage; | ||||
options.EnablePersistentSessions = ServerAllowPersistentSessions.IsChecked == true; | |||||
await _mqttServer.StartAsync(options); | await _mqttServer.StartAsync(options); | ||||
} | } | ||||
@@ -374,10 +377,25 @@ namespace MQTTnet.TestApp.UniversalWindows | |||||
private void ClearSessions(object sender, RoutedEventArgs e) | private void ClearSessions(object sender, RoutedEventArgs e) | ||||
{ | { | ||||
_sessions.Clear(); | |||||
} | } | ||||
private void RefreshSessions(object sender, RoutedEventArgs e) | |||||
private async void RefreshSessions(object sender, RoutedEventArgs e) | |||||
{ | { | ||||
if (_mqttServer == null) | |||||
{ | |||||
return; | |||||
} | |||||
var sessions = await _mqttServer.GetClientSessionsStatusAsync(); | |||||
_sessions.Clear(); | |||||
foreach (var session in sessions) | |||||
{ | |||||
_sessions.Add(session); | |||||
} | |||||
ListViewSessions.DataContext = _sessions; | |||||
} | } | ||||
private async Task WikiCode() | private async Task WikiCode() | ||||