You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

92 rivejä
3.0 KiB

  1. using System;
  2. using System.Threading.Tasks;
  3. using MQTTnet.Core.Channel;
  4. using MQTTnet.Core.Client;
  5. using MQTTnet.Core.Diagnostics;
  6. using MQTTnet.Core.Exceptions;
  7. using MQTTnet.Core.Packets;
  8. using MQTTnet.Core.Serializer;
  9. namespace MQTTnet.Core.Adapter
  10. {
  11. public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter
  12. {
  13. private readonly IMqttCommunicationChannel _channel;
  14. public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer)
  15. {
  16. _channel = channel ?? throw new ArgumentNullException(nameof(channel));
  17. PacketSerializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
  18. }
  19. public IMqttPacketSerializer PacketSerializer { get; }
  20. public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout)
  21. {
  22. await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout);
  23. }
  24. public async Task DisconnectAsync()
  25. {
  26. await _channel.DisconnectAsync();
  27. }
  28. public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout)
  29. {
  30. MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");
  31. await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout);
  32. }
  33. public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout)
  34. {
  35. MqttBasePacket packet;
  36. if (timeout > TimeSpan.Zero)
  37. {
  38. packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout);
  39. }
  40. else
  41. {
  42. packet = await PacketSerializer.DeserializeAsync(_channel);
  43. }
  44. if (packet == null)
  45. {
  46. throw new MqttProtocolViolationException("Received malformed packet.");
  47. }
  48. MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}");
  49. return packet;
  50. }
  51. private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout)
  52. {
  53. var timeoutTask = Task.Delay(timeout);
  54. if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
  55. {
  56. throw new MqttCommunicationTimedOutException();
  57. }
  58. if (task.IsFaulted)
  59. {
  60. throw new MqttCommunicationException(task.Exception);
  61. }
  62. return task.Result;
  63. }
  64. private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout)
  65. {
  66. var timeoutTask = Task.Delay(timeout);
  67. if (await Task.WhenAny(timeoutTask, task) == timeoutTask)
  68. {
  69. throw new MqttCommunicationTimedOutException();
  70. }
  71. if (task.IsFaulted)
  72. {
  73. throw new MqttCommunicationException(task.Exception);
  74. }
  75. }
  76. }
  77. }