Browse Source

Performance optimizations

release/3.x.x
Christian Kratky 7 years ago
parent
commit
9816f12df6
16 changed files with 172 additions and 136 deletions
  1. +3
    -3
      Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj
  2. +2
    -2
      Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
  3. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
  4. +13
    -11
      MQTTnet.Core/Client/MqttClient.cs
  5. +32
    -0
      MQTTnet.Core/Internal/AsyncAutoResetEvent.cs
  6. +3
    -3
      MQTTnet.Core/MQTTnet.Core.csproj
  7. +13
    -7
      MQTTnet.Core/Serializer/ByteReader.cs
  8. +1
    -1
      MQTTnet.Core/Serializer/ByteWriter.cs
  9. +51
    -66
      MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
  10. +3
    -2
      MQTTnet.Core/Serializer/MqttPacketReader.cs
  11. +30
    -32
      MQTTnet.Core/Serializer/MqttPacketWriter.cs
  12. +1
    -1
      MQTTnet.Core/Server/MqttClientSession.cs
  13. +1
    -1
      MQTTnet.Core/Server/MqttClientSessionManager.cs
  14. +10
    -4
      MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs
  15. +1
    -1
      MQTTnet.nuspec
  16. +6
    -0
      README.md

+ 3
- 3
Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj View File

@@ -7,9 +7,9 @@
<Product>MQTTnet</Product> <Product>MQTTnet</Product>
<Description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</Description> <Description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</Description>
<Copyright>Copyright © Christian Kratky 2016-2017</Copyright> <Copyright>Copyright © Christian Kratky 2016-2017</Copyright>
<Version>2.1.0.4</Version>
<AssemblyVersion>2.1.0.11</AssemblyVersion>
<FileVersion>2.1.0.11</FileVersion>
<Version>2.1.1.0</Version>
<AssemblyVersion>2.1.1.0</AssemblyVersion>
<FileVersion>2.1.1.0</FileVersion>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild> <GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<AssemblyName>MQTTnet</AssemblyName> <AssemblyName>MQTTnet</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace> <RootNamespace>MQTTnet</RootNamespace>


+ 2
- 2
Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs View File

@@ -11,5 +11,5 @@ using System.Runtime.InteropServices;
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]
[assembly: ComVisible(false)] [assembly: ComVisible(false)]
[assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")]
[assembly: AssemblyVersion("2.1.0.11")]
[assembly: AssemblyFileVersion("2.1.0.11")]
[assembly: AssemblyVersion("2.1.1.0")]
[assembly: AssemblyFileVersion("2.1.1.0")]

+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs View File

@@ -10,5 +10,5 @@ using System.Runtime.InteropServices;
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]
[assembly: ComVisible(false)] [assembly: ComVisible(false)]
[assembly: AssemblyVersion("2.1.0.11")]
[assembly: AssemblyFileVersion("2.1.0.11")]
[assembly: AssemblyVersion("2.1.1.0")]
[assembly: AssemblyFileVersion("2.1.1.0")]

+ 13
- 11
MQTTnet.Core/Client/MqttClient.cs View File

@@ -88,11 +88,11 @@ namespace MQTTnet.Core.Client
await DisconnectInternalAsync(); await DisconnectInternalAsync();
} }


public async Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters)
public Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters)
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));


return await SubscribeAsync(topicFilters.ToList());
return SubscribeAsync(topicFilters.ToList());
} }


public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter> topicFilters) public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter> topicFilters)
@@ -117,11 +117,11 @@ namespace MQTTnet.Core.Client
return topicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); return topicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
} }


public async Task Unsubscribe(params string[] topicFilters)
public Task Unsubscribe(params string[] topicFilters)
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));


await Unsubscribe(topicFilters.ToList());
return Unsubscribe(topicFilters.ToList());
} }


public async Task Unsubscribe(IList<string> topicFilters) public async Task Unsubscribe(IList<string> topicFilters)
@@ -274,14 +274,14 @@ namespace MQTTnet.Core.Client
FireApplicationMessageReceivedEvent(originalPublishPacket); FireApplicationMessageReceivedEvent(originalPublishPacket);
} }


private async Task SendAsync(MqttBasePacket packet)
private Task SendAsync(MqttBasePacket packet)
{ {
await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
} }


private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{ {
Func<MqttBasePacket, bool> responsePacketSelector = p =>
bool ResponsePacketSelector(MqttBasePacket p)
{ {
var p1 = p as TResponsePacket; var p1 = p as TResponsePacket;
if (p1 == null) if (p1 == null)
@@ -301,10 +301,10 @@ namespace MQTTnet.Core.Client
} }


return true; return true;
};
}


await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout);
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout);
} }


private ushort GetNewPacketIdentifier() private ushort GetNewPacketIdentifier()
@@ -324,8 +324,9 @@ namespace MQTTnet.Core.Client
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()); await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket());
} }
} }
catch (MqttCommunicationException)
catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -351,8 +352,9 @@ namespace MQTTnet.Core.Client
Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget(); Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget();
} }
} }
catch (MqttCommunicationException)
catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 32
- 0
MQTTnet.Core/Internal/AsyncAutoResetEvent.cs View File

@@ -0,0 +1,32 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Core.Internal
{
public class AsyncGate
{
private readonly Queue<TaskCompletionSource<bool>> _waitingTasks = new Queue<TaskCompletionSource<bool>>();

public Task WaitOneAsync()
{
var tcs = new TaskCompletionSource<bool>();
lock (_waitingTasks)
{
_waitingTasks.Enqueue(tcs);
}

return tcs.Task;
}

public void Set()
{
lock (_waitingTasks)
{
if (_waitingTasks.Count > 0)
{
_waitingTasks.Dequeue().SetResult(true);
}
}
}
}
}

+ 3
- 3
MQTTnet.Core/MQTTnet.Core.csproj View File

@@ -9,15 +9,15 @@
<Product>MQTTnet</Product> <Product>MQTTnet</Product>
<Company>Christian Kratky</Company> <Company>Christian Kratky</Company>
<Authors>Christian Kratky</Authors> <Authors>Christian Kratky</Authors>
<Version>2.1.0.11</Version>
<Version>2.1.1.0</Version>
<PackageId>MQTTnet.Core</PackageId> <PackageId>MQTTnet.Core</PackageId>
<Copyright>Copyright © Christian Kratky 2016-2017</Copyright> <Copyright>Copyright © Christian Kratky 2016-2017</Copyright>
<PackageProjectUrl>https://github.com/chkr1011/MQTTnet</PackageProjectUrl> <PackageProjectUrl>https://github.com/chkr1011/MQTTnet</PackageProjectUrl>
<PackageIconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</PackageIconUrl> <PackageIconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</PackageIconUrl>
<RepositoryUrl>https://github.com/chkr1011/MQTTnet</RepositoryUrl> <RepositoryUrl>https://github.com/chkr1011/MQTTnet</RepositoryUrl>
<PackageTags>MQTT MQTTClient MQTTServer MQTTBroker Broker</PackageTags> <PackageTags>MQTT MQTTClient MQTTServer MQTTBroker Broker</PackageTags>
<FileVersion>2.1.0.11</FileVersion>
<AssemblyVersion>2.1.0.11</AssemblyVersion>
<FileVersion>2.1.1.0</FileVersion>
<AssemblyVersion>2.1.1.0</AssemblyVersion>
<PackageLicenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</PackageLicenseUrl> <PackageLicenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</PackageLicenseUrl>
</PropertyGroup> </PropertyGroup>




+ 13
- 7
MQTTnet.Core/Serializer/ByteReader.cs View File

@@ -4,36 +4,42 @@ namespace MQTTnet.Core.Serializer
{ {
public class ByteReader public class ByteReader
{ {
private readonly int _source;
private int _index; private int _index;
private readonly int _byte;


public ByteReader(byte @byte)
public ByteReader(int source)
{ {
_byte = @byte;
_source = source;
} }


public bool Read() public bool Read()
{ {
if (_index >= 8) if (_index >= 8)
{ {
throw new InvalidOperationException("End of the byte reached.");
throw new InvalidOperationException("End of byte reached.");
} }


var result = ((1 << _index) & _byte) > 0;
var result = ((1 << _index) & _source) > 0;
_index++; _index++;

return result; return result;
} }


public byte Read(int count) public byte Read(int count)
{ {
if (_index + count > 8)
{
throw new InvalidOperationException("End of byte will be reached.");
}

var result = 0; var result = 0;
for (var i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {
if (Read())
if (((1 << _index) & _source) > 0)
{ {
result |= 1 << i; result |= 1 << i;
} }

_index++;
} }


return (byte)result; return (byte)result;


+ 1
- 1
MQTTnet.Core/Serializer/ByteWriter.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Core.Serializer


public byte Value => (byte)_byte; public byte Value => (byte)_byte;


public void Write(byte @byte, int count)
public void Write(int @byte, int count)
{ {
for (var i = 0; i < count; i++) for (var i = 0; i < count; i++)
{ {


+ 51
- 66
MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs View File

@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Serializer
{ {
public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer
{ {
public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{ {
if (packet == null) throw new ArgumentNullException(nameof(packet)); if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination)); if (destination == null) throw new ArgumentNullException(nameof(destination));
@@ -20,99 +20,85 @@ namespace MQTTnet.Core.Serializer
var connectPacket = packet as MqttConnectPacket; var connectPacket = packet as MqttConnectPacket;
if (connectPacket != null) if (connectPacket != null)
{ {
await SerializeAsync(connectPacket, destination);
return;
return SerializeAsync(connectPacket, destination);
} }


var connAckPacket = packet as MqttConnAckPacket; var connAckPacket = packet as MqttConnAckPacket;
if (connAckPacket != null) if (connAckPacket != null)
{ {
await SerializeAsync(connAckPacket, destination);
return;
return SerializeAsync(connAckPacket, destination);
} }


var disconnectPacket = packet as MqttDisconnectPacket; var disconnectPacket = packet as MqttDisconnectPacket;
if (disconnectPacket != null) if (disconnectPacket != null)
{ {
await SerializeAsync(disconnectPacket, destination);
return;
return SerializeAsync(disconnectPacket, destination);
} }


var pingReqPacket = packet as MqttPingReqPacket; var pingReqPacket = packet as MqttPingReqPacket;
if (pingReqPacket != null) if (pingReqPacket != null)
{ {
await SerializeAsync(pingReqPacket, destination);
return;
return SerializeAsync(pingReqPacket, destination);
} }


var pingRespPacket = packet as MqttPingRespPacket; var pingRespPacket = packet as MqttPingRespPacket;
if (pingRespPacket != null) if (pingRespPacket != null)
{ {
await SerializeAsync(pingRespPacket, destination);
return;
return SerializeAsync(pingRespPacket, destination);
} }


var publishPacket = packet as MqttPublishPacket; var publishPacket = packet as MqttPublishPacket;
if (publishPacket != null) if (publishPacket != null)
{ {
await SerializeAsync(publishPacket, destination);
return;
return SerializeAsync(publishPacket, destination);
} }


var pubAckPacket = packet as MqttPubAckPacket; var pubAckPacket = packet as MqttPubAckPacket;
if (pubAckPacket != null) if (pubAckPacket != null)
{ {
await SerializeAsync(pubAckPacket, destination);
return;
return SerializeAsync(pubAckPacket, destination);
} }


var pubRecPacket = packet as MqttPubRecPacket; var pubRecPacket = packet as MqttPubRecPacket;
if (pubRecPacket != null) if (pubRecPacket != null)
{ {
await SerializeAsync(pubRecPacket, destination);
return;
return SerializeAsync(pubRecPacket, destination);
} }


var pubRelPacket = packet as MqttPubRelPacket; var pubRelPacket = packet as MqttPubRelPacket;
if (pubRelPacket != null) if (pubRelPacket != null)
{ {
await SerializeAsync(pubRelPacket, destination);
return;
return SerializeAsync(pubRelPacket, destination);
} }


var pubCompPacket = packet as MqttPubCompPacket; var pubCompPacket = packet as MqttPubCompPacket;
if (pubCompPacket != null) if (pubCompPacket != null)
{ {
await SerializeAsync(pubCompPacket, destination);
return;
return SerializeAsync(pubCompPacket, destination);
} }


var subscribePacket = packet as MqttSubscribePacket; var subscribePacket = packet as MqttSubscribePacket;
if (subscribePacket != null) if (subscribePacket != null)
{ {
await SerializeAsync(subscribePacket, destination);
return;
return SerializeAsync(subscribePacket, destination);
} }


var subAckPacket = packet as MqttSubAckPacket; var subAckPacket = packet as MqttSubAckPacket;
if (subAckPacket != null) if (subAckPacket != null)
{ {
await SerializeAsync(subAckPacket, destination);
return;
return SerializeAsync(subAckPacket, destination);
} }


var unsubscribePacket = packet as MqttUnsubscribePacket; var unsubscribePacket = packet as MqttUnsubscribePacket;
if (unsubscribePacket != null) if (unsubscribePacket != null)
{ {
await SerializeAsync(unsubscribePacket, destination);
return;
return SerializeAsync(unsubscribePacket, destination);
} }


var unsubAckPacket = packet as MqttUnsubAckPacket; var unsubAckPacket = packet as MqttUnsubAckPacket;
if (unsubAckPacket != null) if (unsubAckPacket != null)
{ {
await SerializeAsync(unsubAckPacket, destination);
return;
return SerializeAsync(unsubAckPacket, destination);
} }


throw new MqttProtocolViolationException("Packet type invalid."); throw new MqttProtocolViolationException("Packet type invalid.");
@@ -287,6 +273,7 @@ namespace MQTTnet.Core.Serializer


await reader.ReadRemainingDataByteAsync(); await reader.ReadRemainingDataByteAsync();
await reader.ReadRemainingDataByteAsync(); await reader.ReadRemainingDataByteAsync();

var protocolName = await reader.ReadRemainingDataAsync(4); var protocolName = await reader.ReadRemainingDataAsync(4);


if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
@@ -382,19 +369,17 @@ namespace MQTTnet.Core.Serializer
} }
} }


private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");

private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{ {
ValidateConnectPacket(packet); ValidateConnectPacket(packet);


using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
// Write variable header // Write variable header
output.Write(0x00); // 3.1.2.1 Protocol Name
output.Write(0x04); // ""
output.Write('M');
output.Write('Q');
output.Write('T');
output.Write('T');
output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
output.Write(MqttPrefix);
output.Write(0x04); // 3.1.2.2 Protocol Level output.Write(0x04); // 3.1.2.2 Protocol Level


var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
@@ -404,7 +389,7 @@ namespace MQTTnet.Core.Serializer


if (packet.WillMessage != null) if (packet.WillMessage != null)
{ {
connectFlags.Write((byte)packet.WillMessage.QualityOfServiceLevel, 2);
connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
connectFlags.Write(packet.WillMessage.Retain); connectFlags.Write(packet.WillMessage.Retain);
} }
else else
@@ -412,7 +397,7 @@ namespace MQTTnet.Core.Serializer
connectFlags.Write(0, 2); connectFlags.Write(0, 2);
connectFlags.Write(false); connectFlags.Write(false);
} }
connectFlags.Write(packet.Password != null); connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null); connectFlags.Write(packet.Username != null);


@@ -437,11 +422,11 @@ namespace MQTTnet.Core.Serializer
} }


output.InjectFixedHeader(MqttControlPacketType.Connect); output.InjectFixedHeader(MqttControlPacketType.Connect);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
@@ -452,26 +437,26 @@ namespace MQTTnet.Core.Serializer
output.Write((byte)packet.ConnectReturnCode); output.Write((byte)packet.ConnectReturnCode);


output.InjectFixedHeader(MqttControlPacketType.ConnAck); output.InjectFixedHeader(MqttControlPacketType.ConnAck);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{ {
await SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
} }


private async Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
{ {
await SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
} }


private async Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
{ {
await SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
} }


private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
{ {
ValidatePublishPacket(packet); ValidatePublishPacket(packet);


@@ -502,29 +487,29 @@ namespace MQTTnet.Core.Serializer
fixedHeader.Write(packet.Dup); fixedHeader.Write(packet.Dup);


output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value); output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubAck); output.InjectFixedHeader(MqttControlPacketType.PubAck);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubRec); output.InjectFixedHeader(MqttControlPacketType.PubRec);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


@@ -539,24 +524,24 @@ namespace MQTTnet.Core.Serializer
} }
} }


private async Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.PubComp); output.InjectFixedHeader(MqttControlPacketType.PubComp);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


if (packet.TopicFilters?.Any() == true)
if (packet.TopicFilters?.Count > 0)
{ {
foreach (var topicFilter in packet.TopicFilters) foreach (var topicFilter in packet.TopicFilters)
{ {
@@ -566,11 +551,11 @@ namespace MQTTnet.Core.Serializer
} }


output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02); output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
@@ -585,11 +570,11 @@ namespace MQTTnet.Core.Serializer
} }


output.InjectFixedHeader(MqttControlPacketType.SubAck); output.InjectFixedHeader(MqttControlPacketType.SubAck);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
@@ -604,27 +589,27 @@ namespace MQTTnet.Core.Serializer
} }


output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02); output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.Write(packet.PacketIdentifier); output.Write(packet.PacketIdentifier);


output.InjectFixedHeader(MqttControlPacketType.UnsubAck); output.InjectFixedHeader(MqttControlPacketType.UnsubAck);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }


private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
{ {
using (var output = new MqttPacketWriter()) using (var output = new MqttPacketWriter())
{ {
output.InjectFixedHeader(type); output.InjectFixedHeader(type);
await output.WriteToAsync(destination);
return output.WriteToAsync(destination);
} }
} }
} }


+ 3
- 2
MQTTnet.Core/Serializer/MqttPacketReader.cs View File

@@ -107,15 +107,16 @@ namespace MQTTnet.Core.Serializer
return await ReadRemainingDataAsync(length); return await ReadRemainingDataAsync(length);
} }


public async Task<byte[]> ReadRemainingDataAsync()
public Task<byte[]> ReadRemainingDataAsync()
{ {
return await ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position);
return ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position);
} }


public async Task<byte[]> ReadRemainingDataAsync(int length) public async Task<byte[]> ReadRemainingDataAsync(int length)
{ {
var buffer = new byte[length]; var buffer = new byte[length];
await _remainingData.ReadAsync(buffer, 0, buffer.Length); await _remainingData.ReadAsync(buffer, 0, buffer.Length);

return buffer; return buffer;
} }




+ 30
- 32
MQTTnet.Core/Serializer/MqttPacketWriter.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Core.Serializer
{ {
public sealed class MqttPacketWriter : IDisposable public sealed class MqttPacketWriter : IDisposable
{ {
private readonly MemoryStream _buffer = new MemoryStream();
private readonly MemoryStream _buffer = new MemoryStream(512);


public void InjectFixedHeader(byte fixedHeader) public void InjectFixedHeader(byte fixedHeader)
{ {
@@ -20,31 +20,28 @@ namespace MQTTnet.Core.Serializer
return; return;
} }


var backupBuffer = _buffer.ToArray();
var remainingLength = (int)_buffer.Length; var remainingLength = (int)_buffer.Length;
using (var buffer = new MemoryStream())
{
_buffer.WriteTo(buffer);
_buffer.SetLength(0);


_buffer.WriteByte(fixedHeader);
_buffer.SetLength(0);

_buffer.WriteByte(fixedHeader);


// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = remainingLength;
do
// Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
var x = remainingLength;
do
{
var encodedByte = x % 128;
x = x / 128;
if (x > 0)
{ {
var encodedByte = (byte)(x % 128);
x = x / 128;
if (x > 0)
{
encodedByte = (byte)(encodedByte | 128);
}

_buffer.WriteByte(encodedByte);
} while (x > 0);

buffer.Position = 0;
buffer.WriteTo(_buffer);
}
encodedByte = encodedByte | 128;
}

_buffer.WriteByte((byte)encodedByte);
} while (x > 0);

_buffer.Write(backupBuffer, 0, backupBuffer.Length);
} }


public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
@@ -59,11 +56,6 @@ namespace MQTTnet.Core.Serializer
_buffer.WriteByte(value); _buffer.WriteByte(value);
} }


public void Write(char value)
{
_buffer.WriteByte((byte)value);
}

public void Write(ushort value) public void Write(ushort value)
{ {
var buffer = BitConverter.GetBytes(value); var buffer = BitConverter.GetBytes(value);
@@ -73,11 +65,15 @@ namespace MQTTnet.Core.Serializer


public void Write(ByteWriter value) public void Write(ByteWriter value)
{ {
if (value == null) throw new ArgumentNullException(nameof(value));

_buffer.WriteByte(value.Value); _buffer.WriteByte(value.Value);
} }


public void Write(byte[] value)
public void Write(params byte[] value)
{ {
if (value == null) throw new ArgumentNullException(nameof(value));

_buffer.Write(value, 0, value.Length); _buffer.Write(value, 0, value.Length);
} }


@@ -94,14 +90,16 @@ namespace MQTTnet.Core.Serializer
Write(value); Write(value);
} }


public void Dispose()
public Task WriteToAsync(IMqttCommunicationChannel destination)
{ {
_buffer?.Dispose();
if (destination == null) throw new ArgumentNullException(nameof(destination));

return destination.WriteAsync(_buffer.ToArray());
} }


public async Task WriteToAsync(IMqttCommunicationChannel destination)
public void Dispose()
{ {
await destination.WriteAsync(_buffer.ToArray());
_buffer?.Dispose();
} }
} }
} }

+ 1
- 1
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -74,7 +74,7 @@ namespace MQTTnet.Core.Server
} }
} }


public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{ {
if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));


+ 1
- 1
MQTTnet.Core/Server/MqttClientSessionManager.cs View File

@@ -86,7 +86,7 @@ namespace MQTTnet.Core.Server
{ {
foreach (var clientSession in _clientSessions.Values.ToList()) foreach (var clientSession in _clientSessions.Values.ToList())
{ {
clientSession.DispatchPublishPacket(senderClientSession, publishPacket);
clientSession.EnqueuePublishPacket(senderClientSession, publishPacket);
} }
} }
} }


+ 10
- 4
MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs View File

@@ -13,8 +13,8 @@ namespace MQTTnet.Core.Server
{ {
public class MqttOutgoingPublicationsManager public class MqttOutgoingPublicationsManager
{ {
private readonly AutoResetEvent _resetEvent = new AutoResetEvent(false);
private readonly List<MqttClientPublishPacketContext> _pendingPublishPackets = new List<MqttClientPublishPacketContext>(); private readonly List<MqttClientPublishPacketContext> _pendingPublishPackets = new List<MqttClientPublishPacketContext>();
private readonly AsyncGate _gate = new AsyncGate();


private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
@@ -35,11 +35,12 @@ namespace MQTTnet.Core.Server
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


Task.Run(async () => await SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
} }


public void Stop() public void Stop()
{ {
_adapter = null;
_cancellationTokenSource?.Cancel(); _cancellationTokenSource?.Cancel();
_cancellationTokenSource = null; _cancellationTokenSource = null;
} }
@@ -52,7 +53,7 @@ namespace MQTTnet.Core.Server
lock (_pendingPublishPackets) lock (_pendingPublishPackets)
{ {
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket)); _pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket));
_resetEvent.Set();
_gate.Set();
} }
} }


@@ -62,12 +63,17 @@ namespace MQTTnet.Core.Server
{ {
try try
{ {
_resetEvent.WaitOne();
await _gate.WaitOneAsync();
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested)
{ {
return; return;
} }


if (_adapter == null)
{
continue;
}

List<MqttClientPublishPacketContext> pendingPublishPackets; List<MqttClientPublishPacketContext> pendingPublishPackets;
lock (_pendingPublishPackets) lock (_pendingPublishPackets)
{ {


+ 1
- 1
MQTTnet.nuspec View File

@@ -2,7 +2,7 @@
<package > <package >
<metadata> <metadata>
<id>MQTTnet</id> <id>MQTTnet</id>
<version>2.1.0.11</version>
<version>2.1.1.0</version>
<authors>Christian Kratky</authors> <authors>Christian Kratky</authors>
<owners>Christian Kratky</owners> <owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl> <licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>


+ 6
- 0
README.md View File

@@ -17,6 +17,12 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien


* 3.1.1 * 3.1.1


## Nuget
This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/

## Contributions
If you want to contribute to this project just create a pull request.

# MqttClient # MqttClient
## Example ## Example




Loading…
Cancel
Save