You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

159 lines
6.0 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Exceptions;
  3. using MQTTnet.Extensions.Rpc.Options;
  4. using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
  5. using MQTTnet.Protocol;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using MQTTnet.Client.Subscribing;
  11. using MQTTnet.Implementations;
  12. namespace MQTTnet.Extensions.Rpc
  13. {
  14. public sealed class MqttRpcClient : IMqttRpcClient
  15. {
  16. readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
  17. readonly IMqttClient _mqttClient;
  18. readonly IMqttRpcClientOptions _options;
  19. readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler;
  20. [Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")]
  21. public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions())
  22. {
  23. }
  24. public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options)
  25. {
  26. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  27. _options = options ?? throw new ArgumentNullException(nameof(options));
  28. _applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler(
  29. mqttClient.ApplicationMessageReceivedHandler,
  30. HandleApplicationMessageReceivedAsync);
  31. _mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler;
  32. }
  33. public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
  34. {
  35. using (var timeoutToken = new CancellationTokenSource(timeout))
  36. {
  37. try
  38. {
  39. return await ExecuteAsync(methodName, payload, qualityOfServiceLevel, timeoutToken.Token).ConfigureAwait(false);
  40. }
  41. catch (OperationCanceledException exception)
  42. {
  43. if (timeoutToken.IsCancellationRequested)
  44. {
  45. throw new MqttCommunicationTimedOutException(exception);
  46. }
  47. throw;
  48. }
  49. }
  50. }
  51. public async Task<byte[]> ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken)
  52. {
  53. if (methodName == null) throw new ArgumentNullException(nameof(methodName));
  54. if (!(_mqttClient.ApplicationMessageReceivedHandler is RpcAwareApplicationMessageReceivedHandler))
  55. {
  56. throw new InvalidOperationException("The application message received handler was modified.");
  57. }
  58. var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext
  59. {
  60. MethodName = methodName,
  61. QualityOfServiceLevel = qualityOfServiceLevel,
  62. MqttClient = _mqttClient,
  63. Options = _options
  64. });
  65. var requestTopic = topicNames.RequestTopic;
  66. var responseTopic = topicNames.ResponseTopic;
  67. if (string.IsNullOrWhiteSpace(requestTopic))
  68. {
  69. throw new MqttProtocolViolationException("RPC request topic is empty.");
  70. }
  71. if (string.IsNullOrWhiteSpace(responseTopic))
  72. {
  73. throw new MqttProtocolViolationException("RPC response topic is empty.");
  74. }
  75. var requestMessage = new MqttApplicationMessageBuilder()
  76. .WithTopic(requestTopic)
  77. .WithPayload(payload)
  78. .WithQualityOfServiceLevel(qualityOfServiceLevel)
  79. .Build();
  80. try
  81. {
  82. #if NET452
  83. var promise = new TaskCompletionSource<byte[]>();
  84. #else
  85. var promise = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
  86. #endif
  87. if (!_waitingCalls.TryAdd(responseTopic, promise))
  88. {
  89. throw new InvalidOperationException();
  90. }
  91. var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
  92. .WithTopicFilter(responseTopic, qualityOfServiceLevel)
  93. .Build();
  94. await _mqttClient.SubscribeAsync(subscribeOptions, cancellationToken).ConfigureAwait(false);
  95. await _mqttClient.PublishAsync(requestMessage, cancellationToken).ConfigureAwait(false);
  96. using (cancellationToken.Register(() => { promise.TrySetCanceled(); }))
  97. {
  98. return await promise.Task.ConfigureAwait(false);
  99. }
  100. }
  101. finally
  102. {
  103. _waitingCalls.TryRemove(responseTopic, out _);
  104. await _mqttClient.UnsubscribeAsync(responseTopic).ConfigureAwait(false);
  105. }
  106. }
  107. Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
  108. {
  109. if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var promise))
  110. {
  111. return PlatformAbstractionLayer.CompletedTask;
  112. }
  113. #if NET452
  114. Task.Run(() => promise.TrySetResult(eventArgs.ApplicationMessage.Payload));
  115. #else
  116. promise.TrySetResult(eventArgs.ApplicationMessage.Payload);
  117. #endif
  118. // Set this message to handled to that other code can avoid execution etc.
  119. eventArgs.IsHandled = true;
  120. return PlatformAbstractionLayer.CompletedTask;
  121. }
  122. public void Dispose()
  123. {
  124. _mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler.OriginalHandler;
  125. foreach (var tcs in _waitingCalls)
  126. {
  127. tcs.Value.TrySetCanceled();
  128. }
  129. _waitingCalls.Clear();
  130. }
  131. }
  132. }