You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MqttRpcClient.cs 5.0 KiB

6 jaren geleden
6 jaren geleden
6 jaren geleden
6 jaren geleden
6 jaren geleden
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Text;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MQTTnet.Client;
  7. using MQTTnet.Exceptions;
  8. using MQTTnet.Protocol;
  9. namespace MQTTnet.Extensions.Rpc
  10. {
  11. public sealed class MqttRpcClient : IDisposable
  12. {
  13. private readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
  14. private readonly IMqttClient _mqttClient;
  15. public MqttRpcClient(IMqttClient mqttClient)
  16. {
  17. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  18. _mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
  19. }
  20. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
  21. {
  22. return ExecuteAsync(timeout, methodName, Encoding.UTF8.GetBytes(payload), qualityOfServiceLevel, CancellationToken.None);
  23. }
  24. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken)
  25. {
  26. return ExecuteAsync(timeout, methodName, Encoding.UTF8.GetBytes(payload), qualityOfServiceLevel, cancellationToken);
  27. }
  28. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
  29. {
  30. return ExecuteAsync(timeout, methodName, payload, qualityOfServiceLevel, CancellationToken.None);
  31. }
  32. public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken)
  33. {
  34. if (methodName == null) throw new ArgumentNullException(nameof(methodName));
  35. if (methodName.Contains("/") || methodName.Contains("+") || methodName.Contains("#"))
  36. {
  37. throw new ArgumentException("The method name cannot contain /, + or #.");
  38. }
  39. var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}";
  40. var responseTopic = requestTopic + "/response";
  41. var requestMessage = new MqttApplicationMessageBuilder()
  42. .WithTopic(requestTopic)
  43. .WithPayload(payload)
  44. .WithQualityOfServiceLevel(qualityOfServiceLevel)
  45. .Build();
  46. try
  47. {
  48. var tcs = new TaskCompletionSource<byte[]>();
  49. if (!_waitingCalls.TryAdd(responseTopic, tcs))
  50. {
  51. throw new InvalidOperationException();
  52. }
  53. await _mqttClient.SubscribeAsync(responseTopic, qualityOfServiceLevel).ConfigureAwait(false);
  54. await _mqttClient.PublishAsync(requestMessage).ConfigureAwait(false);
  55. using (var timeoutCts = new CancellationTokenSource(timeout))
  56. using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token))
  57. {
  58. linkedCts.Token.Register(() =>
  59. {
  60. if (!tcs.Task.IsCompleted && !tcs.Task.IsFaulted && !tcs.Task.IsCanceled)
  61. {
  62. tcs.TrySetCanceled();
  63. }
  64. });
  65. try
  66. {
  67. var result = await tcs.Task.ConfigureAwait(false);
  68. timeoutCts.Cancel(false);
  69. return result;
  70. }
  71. catch (TaskCanceledException taskCanceledException)
  72. {
  73. if (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
  74. {
  75. throw new MqttCommunicationTimedOutException(taskCanceledException);
  76. }
  77. else
  78. {
  79. throw;
  80. }
  81. }
  82. }
  83. }
  84. finally
  85. {
  86. _waitingCalls.TryRemove(responseTopic, out _);
  87. await _mqttClient.UnsubscribeAsync(responseTopic).ConfigureAwait(false);
  88. }
  89. }
  90. private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
  91. {
  92. if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
  93. {
  94. return;
  95. }
  96. if (tcs.Task.IsCompleted || tcs.Task.IsCanceled)
  97. {
  98. return;
  99. }
  100. tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
  101. }
  102. public void Dispose()
  103. {
  104. foreach (var tcs in _waitingCalls)
  105. {
  106. tcs.Value.SetCanceled();
  107. }
  108. _waitingCalls.Clear();
  109. }
  110. }
  111. }