Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 

162 rader
6.5 KiB

  1. using MQTTnet.Client;
  2. using MQTTnet.Exceptions;
  3. using MQTTnet.Extensions.Rpc.Options;
  4. using MQTTnet.Extensions.Rpc.Options.TopicGeneration;
  5. using MQTTnet.Protocol;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. namespace MQTTnet.Extensions.Rpc
  12. {
  13. public sealed class MqttRpcClient : IMqttRpcClient
  14. {
  15. readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
  16. readonly IMqttClient _mqttClient;
  17. readonly IMqttRpcClientOptions _options;
  18. readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler;
  19. [Obsolete("Use MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options).")]
  20. public MqttRpcClient(IMqttClient mqttClient) : this(mqttClient, new MqttRpcClientOptions())
  21. {
  22. }
  23. public MqttRpcClient(IMqttClient mqttClient, IMqttRpcClientOptions options)
  24. {
  25. _mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
  26. _options = options ?? throw new ArgumentNullException(nameof(options));
  27. _applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler(
  28. mqttClient.ApplicationMessageReceivedHandler,
  29. HandleApplicationMessageReceivedAsync);
  30. _mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler;
  31. }
  32. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
  33. {
  34. return ExecuteAsync(timeout, methodName, Encoding.UTF8.GetBytes(payload), qualityOfServiceLevel, CancellationToken.None);
  35. }
  36. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken)
  37. {
  38. return ExecuteAsync(timeout, methodName, Encoding.UTF8.GetBytes(payload), qualityOfServiceLevel, cancellationToken);
  39. }
  40. public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
  41. {
  42. return ExecuteAsync(timeout, methodName, payload, qualityOfServiceLevel, CancellationToken.None);
  43. }
  44. public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken)
  45. {
  46. if (methodName == null) throw new ArgumentNullException(nameof(methodName));
  47. if (!(_mqttClient.ApplicationMessageReceivedHandler is RpcAwareApplicationMessageReceivedHandler))
  48. {
  49. throw new InvalidOperationException("The application message received handler was modified.");
  50. }
  51. var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(new TopicGenerationContext
  52. {
  53. MethodName = methodName,
  54. QualityOfServiceLevel = qualityOfServiceLevel,
  55. MqttClient = _mqttClient,
  56. Options = _options
  57. });
  58. var requestTopic = topicNames.RequestTopic;
  59. var responseTopic = topicNames.ResponseTopic;
  60. if (string.IsNullOrWhiteSpace(requestTopic))
  61. {
  62. throw new MqttProtocolViolationException("RPC request topic is empty.");
  63. }
  64. if (string.IsNullOrWhiteSpace(responseTopic))
  65. {
  66. throw new MqttProtocolViolationException("RPC response topic is empty.");
  67. }
  68. var requestMessage = new MqttApplicationMessageBuilder()
  69. .WithTopic(requestTopic)
  70. .WithPayload(payload)
  71. .WithQualityOfServiceLevel(qualityOfServiceLevel)
  72. .Build();
  73. try
  74. {
  75. var tcs = new TaskCompletionSource<byte[]>();
  76. if (!_waitingCalls.TryAdd(responseTopic, tcs))
  77. {
  78. throw new InvalidOperationException();
  79. }
  80. await _mqttClient.SubscribeAsync(responseTopic, qualityOfServiceLevel).ConfigureAwait(false);
  81. await _mqttClient.PublishAsync(requestMessage).ConfigureAwait(false);
  82. using (var timeoutCts = new CancellationTokenSource(timeout))
  83. using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token))
  84. {
  85. linkedCts.Token.Register(() =>
  86. {
  87. if (!tcs.Task.IsCompleted && !tcs.Task.IsFaulted && !tcs.Task.IsCanceled)
  88. {
  89. tcs.TrySetCanceled();
  90. }
  91. });
  92. try
  93. {
  94. var result = await tcs.Task.ConfigureAwait(false);
  95. timeoutCts.Cancel(false);
  96. return result;
  97. }
  98. catch (OperationCanceledException exception)
  99. {
  100. if (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
  101. {
  102. throw new MqttCommunicationTimedOutException(exception);
  103. }
  104. else
  105. {
  106. throw;
  107. }
  108. }
  109. }
  110. }
  111. finally
  112. {
  113. _waitingCalls.TryRemove(responseTopic, out _);
  114. await _mqttClient.UnsubscribeAsync(responseTopic).ConfigureAwait(false);
  115. }
  116. }
  117. Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
  118. {
  119. if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
  120. {
  121. return Task.FromResult(0);
  122. }
  123. tcs.TrySetResult(eventArgs.ApplicationMessage.Payload);
  124. return Task.FromResult(0);
  125. }
  126. public void Dispose()
  127. {
  128. _mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler.OriginalHandler;
  129. foreach (var tcs in _waitingCalls)
  130. {
  131. tcs.Value.TrySetCanceled();
  132. }
  133. _waitingCalls.Clear();
  134. }
  135. }
  136. }