Browse Source

Optimize task management

release/3.x.x
Christian Kratky 7 years ago
parent
commit
314fca3f89
2 changed files with 24 additions and 23 deletions
  1. +21
    -21
      MQTTnet.Core/Client/MqttClient.cs
  2. +3
    -2
      MQTTnet.nuspec

+ 21
- 21
MQTTnet.Core/Client/MqttClient.cs View File

@@ -191,34 +191,30 @@ namespace MQTTnet.Core.Client
} }
} }


private async void ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
{ {
try try
{ {
if (mqttPacket is MqttPingReqPacket) if (mqttPacket is MqttPingReqPacket)
{ {
await SendAsync(new MqttPingRespPacket());
return;
return SendAsync(new MqttPingRespPacket());
} }


if (mqttPacket is MqttDisconnectPacket) if (mqttPacket is MqttDisconnectPacket)
{ {
await DisconnectAsync();
return;
return DisconnectAsync();
} }


var publishPacket = mqttPacket as MqttPublishPacket; var publishPacket = mqttPacket as MqttPublishPacket;
if (publishPacket != null) if (publishPacket != null)
{ {
await ProcessReceivedPublishPacket(publishPacket);
return;
return ProcessReceivedPublishPacket(publishPacket);
} }


var pubRelPacket = mqttPacket as MqttPubRelPacket; var pubRelPacket = mqttPacket as MqttPubRelPacket;
if (pubRelPacket != null) if (pubRelPacket != null)
{ {
await ProcessReceivedPubRelPacket(pubRelPacket);
return;
return ProcessReceivedPubRelPacket(pubRelPacket);
} }


_packetDispatcher.Dispatch(mqttPacket); _packetDispatcher.Dispatch(mqttPacket);
@@ -227,6 +223,8 @@ namespace MQTTnet.Core.Client
{ {
MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet."); MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet.");
} }

return Task.FromResult(0);
} }


private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
@@ -246,25 +244,27 @@ namespace MQTTnet.Core.Client
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage));
} }


private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
{ {
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{ {
FireApplicationMessageReceivedEvent(publishPacket); FireApplicationMessageReceivedEvent(publishPacket);
return Task.FromResult(0);
} }
else

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{ {
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
FireApplicationMessageReceivedEvent(publishPacket);
await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
_pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket;
await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}
FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier });
} }

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
_pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket;
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}

throw new InvalidOperationException();
} }


private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)


+ 3
- 2
MQTTnet.nuspec View File

@@ -12,8 +12,9 @@
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description> <description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Any exception while accessing the underlying data source is now rethrown as "MqttCommunicationException". <releaseNotes>* [Core] Any exception while accessing the underlying data source is now rethrown as "MqttCommunicationException".
* [Core] Extended exception information when an invalid packet type is received. * [Core] Extended exception information when an invalid packet type is received.
* [Server] Added TLS 1.2 support (thanks to Zazzmatazz)
* [Client] Added TLS 1.2 support (thanks to Zazzmatazz)</releaseNotes>
* [Server] Added TLS 1.2 support (but not for UWP) (thanks to Zazzmatazz)
* [Client] Added TLS 1.2 support (thanks to Zazzmatazz)
* [Core] Optimized async task management</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright> <copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker</tags> <tags>MQTT MQTTClient MQTTServer MQTTBroker Broker</tags>
</metadata> </metadata>


Loading…
Cancel
Save