You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

50 line
2.0 KiB

  1. using System;
  2. using System.Threading.Tasks;
  3. using IronPython.Runtime;
  4. using Microsoft.Extensions.Logging;
  5. using MQTTnet.Server.Scripting;
  6. namespace MQTTnet.Server.Mqtt
  7. {
  8. public class MqttSubscriptionInterceptor : IMqttServerSubscriptionInterceptor
  9. {
  10. private readonly PythonScriptHostService _pythonScriptHostService;
  11. private readonly ILogger _logger;
  12. public MqttSubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger<MqttSubscriptionInterceptor> logger)
  13. {
  14. _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService));
  15. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
  16. }
  17. public Task InterceptSubscriptionAsync(MqttSubscriptionInterceptorContext context)
  18. {
  19. try
  20. {
  21. var sessionItems = (PythonDictionary)context.SessionItems[MqttServerConnectionValidator.WrappedSessionItemsKey];
  22. var pythonContext = new PythonDictionary
  23. {
  24. { "client_id", context.ClientId },
  25. { "session_items", sessionItems },
  26. { "topic", context.TopicFilter.Topic },
  27. { "qos", (int)context.TopicFilter.QualityOfServiceLevel },
  28. { "accept_subscription", context.AcceptSubscription },
  29. { "close_connection", context.CloseConnection }
  30. };
  31. _pythonScriptHostService.InvokeOptionalFunction("on_intercept_subscription", pythonContext);
  32. context.AcceptSubscription = (bool)pythonContext["accept_subscription"];
  33. context.CloseConnection = (bool)pythonContext["close_connection"];
  34. }
  35. catch (Exception exception)
  36. {
  37. _logger.LogError(exception, "Error while intercepting subscription.");
  38. }
  39. return Task.CompletedTask;
  40. }
  41. }
  42. }