Browse Source

Fix async issues

release/3.x.x
Christian 6 years ago
parent
commit
1760dc7d35
1 changed files with 9 additions and 9 deletions
  1. +9
    -9
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs

+ 9
- 9
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs View File

@@ -145,16 +145,16 @@ namespace MQTTnet.Server
PendingMessagesQueue.Enqueue(publishPacket); PendingMessagesQueue.Enqueue(publishPacket);
} }


public Task SubscribeAsync(IList<TopicFilter> topicFilters)
public async Task SubscribeAsync(IList<TopicFilter> topicFilters)
{ {
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));


var response = SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket
await SubscriptionsManager.SubscribeAsync(new MqttSubscribePacket
{ {
TopicFilters = topicFilters TopicFilters = topicFilters
});
}).ConfigureAwait(false);


return response;
await EnqueueSubscribedRetainedMessagesAsync(topicFilters).ConfigureAwait(false);
} }


public Task UnsubscribeAsync(IList<string> topicFilters) public Task UnsubscribeAsync(IList<string> topicFilters)
@@ -270,7 +270,7 @@ namespace MQTTnet.Server
await StopAsync().ConfigureAwait(false); await StopAsync().ConfigureAwait(false);
} }


await EnqueueSubscribedRetainedMessagesAsync(subscribePacket).ConfigureAwait(false);
await EnqueueSubscribedRetainedMessagesAsync(subscribePacket.TopicFilters).ConfigureAwait(false);
} }


private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken) private async Task HandleIncomingUnsubscribePacketAsync(IMqttChannelAdapter adapter, MqttUnsubscribePacket unsubscribePacket, CancellationToken cancellationToken)
@@ -279,12 +279,12 @@ namespace MQTTnet.Server
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult); await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, unsubscribeResult);
} }


private async Task EnqueueSubscribedRetainedMessagesAsync(MqttSubscribePacket subscribePacket)
private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
{ {
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(subscribePacket);
foreach (var publishPacket in retainedMessages)
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters);
foreach (var applicationMessage in retainedMessages)
{ {
await EnqueueApplicationMessageAsync(publishPacket);
await EnqueueApplicationMessageAsync(applicationMessage).ConfigureAwait(false);
} }
} }




Loading…
Cancel
Save