Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

103 řádky
3.9 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading.Tasks;
  4. using MQTTnet.Core.Exceptions;
  5. using MQTTnet.Core.Internal;
  6. using MQTTnet.Core.Packets;
  7. using Microsoft.Extensions.Logging;
  8. namespace MQTTnet.Core.Client
  9. {
  10. public class MqttPacketDispatcher
  11. {
  12. private readonly ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>> _packetByResponseType = new ConcurrentDictionary<Type, TaskCompletionSource<MqttBasePacket>>();
  13. private readonly ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>> _packetByResponseTypeAndIdentifier = new ConcurrentDictionary<Type, ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>>();
  14. private readonly ILogger<MqttPacketDispatcher> _logger;
  15. public MqttPacketDispatcher(ILogger<MqttPacketDispatcher> logger)
  16. {
  17. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  18. }
  19. public async Task<MqttBasePacket> WaitForPacketAsync(MqttBasePacket request, Type responseType, TimeSpan timeout)
  20. {
  21. if (request == null) throw new ArgumentNullException(nameof(request));
  22. var packetAwaiter = AddPacketAwaiter(request, responseType);
  23. try
  24. {
  25. return await packetAwaiter.Task.TimeoutAfter(timeout).ConfigureAwait(false);
  26. }
  27. catch (MqttCommunicationTimedOutException)
  28. {
  29. _logger.LogWarning("Timeout while waiting for packet of type '{0}'.", responseType.Name);
  30. throw;
  31. }
  32. finally
  33. {
  34. RemovePacketAwaiter(request, responseType);
  35. }
  36. }
  37. public void Dispatch(MqttBasePacket packet)
  38. {
  39. if (packet == null) throw new ArgumentNullException(nameof(packet));
  40. var type = packet.GetType();
  41. if (packet is IMqttPacketWithIdentifier withIdentifier)
  42. {
  43. if (_packetByResponseTypeAndIdentifier.TryGetValue(type, out var byid))
  44. {
  45. if (byid.TryRemove(withIdentifier.PacketIdentifier, out var tcs))
  46. {
  47. tcs.TrySetResult(packet);
  48. return;
  49. }
  50. }
  51. }
  52. else if (_packetByResponseType.TryRemove(type, out var tcs))
  53. {
  54. tcs.TrySetResult(packet);
  55. return;
  56. }
  57. throw new InvalidOperationException($"Packet of type '{type.Name}' not handled or dispatched.");
  58. }
  59. public void Reset()
  60. {
  61. _packetByResponseTypeAndIdentifier.Clear();
  62. _packetByResponseType.Clear();
  63. }
  64. private TaskCompletionSource<MqttBasePacket> AddPacketAwaiter(MqttBasePacket request, Type responseType)
  65. {
  66. var tcs = new TaskCompletionSource<MqttBasePacket>();
  67. if (request is IMqttPacketWithIdentifier requestWithIdentifier)
  68. {
  69. var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
  70. byId[requestWithIdentifier.PacketIdentifier] = tcs;
  71. }
  72. else
  73. {
  74. _packetByResponseType[responseType] = tcs;
  75. }
  76. return tcs;
  77. }
  78. private void RemovePacketAwaiter(MqttBasePacket request, Type responseType)
  79. {
  80. if (request is IMqttPacketWithIdentifier requestWithIdentifier)
  81. {
  82. var byId = _packetByResponseTypeAndIdentifier.GetOrAdd(responseType, key => new ConcurrentDictionary<ushort, TaskCompletionSource<MqttBasePacket>>());
  83. byId.TryRemove(requestWithIdentifier.PacketIdentifier, out var _);
  84. }
  85. else
  86. {
  87. _packetByResponseType.TryRemove(responseType, out var _);
  88. }
  89. }
  90. }
  91. }