diff --git a/MQTTnet.sln b/MQTTnet.sln index 033fe99..f2fff83 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -47,7 +47,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.ManagedC EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", "Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnetServer", "Source\MQTTnetServer\MQTTnetServer.csproj", "{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Server", "Source\MQTTnet.Server\MQTTnet.Server.csproj", "{5699FB8C-838C-4AB0-80A5-9CA809F9B65B}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/MQTTnet.sln.DotSettings b/MQTTnet.sln.DotSettings index 023e3d6..20d5d14 100644 --- a/MQTTnet.sln.DotSettings +++ b/MQTTnet.sln.DotSettings @@ -7,6 +7,7 @@ True True True + True True True True diff --git a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 65514d0..41039b5 100644 --- a/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -40,7 +40,6 @@ namespace MQTTnet.AspNetCore public void Dispose() { - StopAsync().GetAwaiter().GetResult(); } } } \ No newline at end of file diff --git a/Source/MQTTnet.Server/Configuration/EndpointConfiguration.cs b/Source/MQTTnet.Server/Configuration/EndpointConfiguration.cs new file mode 100644 index 0000000..b4b0496 --- /dev/null +++ b/Source/MQTTnet.Server/Configuration/EndpointConfiguration.cs @@ -0,0 +1,9 @@ +namespace MQTTnet.Server.Configuration +{ + public class EndpointConfiguration + { + public int Port { get; set; } = 1883; + + public int BacklogSize { get; set; } = 10; + } +} diff --git a/Source/MQTTnetServer/Settings/ListenModel.cs b/Source/MQTTnet.Server/Configuration/ListenModel.cs similarity index 93% rename from Source/MQTTnetServer/Settings/ListenModel.cs rename to Source/MQTTnet.Server/Configuration/ListenModel.cs index 5af984f..bf62a68 100644 --- a/Source/MQTTnetServer/Settings/ListenModel.cs +++ b/Source/MQTTnet.Server/Configuration/ListenModel.cs @@ -1,4 +1,4 @@ -namespace MQTTnetServer.Settings +namespace MQTTnet.Server.Configuration { /// /// Listen Entry Settings Model diff --git a/Source/MQTTnetServer/Settings/ListenProtocolTypes.cs b/Source/MQTTnet.Server/Configuration/ListenProtocolTypes.cs similarity index 89% rename from Source/MQTTnetServer/Settings/ListenProtocolTypes.cs rename to Source/MQTTnet.Server/Configuration/ListenProtocolTypes.cs index 5ca4d74..84a0a48 100644 --- a/Source/MQTTnetServer/Settings/ListenProtocolTypes.cs +++ b/Source/MQTTnet.Server/Configuration/ListenProtocolTypes.cs @@ -1,4 +1,4 @@ -namespace MQTTnetServer.Settings +namespace MQTTnet.Server.Configuration { /// /// Listen Protocol Types diff --git a/Source/MQTTnet.Server/Configuration/MqttNetServerConfiguration.cs b/Source/MQTTnet.Server/Configuration/MqttNetServerConfiguration.cs new file mode 100644 index 0000000..632b8d1 --- /dev/null +++ b/Source/MQTTnet.Server/Configuration/MqttNetServerConfiguration.cs @@ -0,0 +1,7 @@ +namespace MQTTnet.Server.Configuration +{ + public class MqttNetServerConfiguration + { + + } +} diff --git a/Source/MQTTnetServer/Settings/SettingsModel.cs b/Source/MQTTnet.Server/Configuration/SettingsModel.cs similarity index 87% rename from Source/MQTTnetServer/Settings/SettingsModel.cs rename to Source/MQTTnet.Server/Configuration/SettingsModel.cs index 267ace1..e393084 100644 --- a/Source/MQTTnetServer/Settings/SettingsModel.cs +++ b/Source/MQTTnet.Server/Configuration/SettingsModel.cs @@ -1,6 +1,6 @@ using System.Collections.Generic; -namespace MQTTnetServer.Settings +namespace MQTTnet.Server.Configuration { /// /// Main Settings Model diff --git a/Source/MQTTnetServer/Controllers/ValuesController.cs b/Source/MQTTnet.Server/Controllers/ValuesController.cs similarity index 86% rename from Source/MQTTnetServer/Controllers/ValuesController.cs rename to Source/MQTTnet.Server/Controllers/ValuesController.cs index 173fc34..bfcc398 100644 --- a/Source/MQTTnetServer/Controllers/ValuesController.cs +++ b/Source/MQTTnet.Server/Controllers/ValuesController.cs @@ -1,10 +1,7 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; +using System.Collections.Generic; using Microsoft.AspNetCore.Mvc; -namespace MQTTnetServer.Controllers +namespace MQTTnet.Server.Controllers { [Route("api/[controller]")] [ApiController] diff --git a/Source/MQTTnet.Server/LICENSE b/Source/MQTTnet.Server/LICENSE new file mode 100644 index 0000000..7d84e2b --- /dev/null +++ b/Source/MQTTnet.Server/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Christian Kratky + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs new file mode 100644 index 0000000..7f35117 --- /dev/null +++ b/Source/MQTTnet.Server/Logging/MqttNetChildLoggerWrapper.cs @@ -0,0 +1,43 @@ +using System; +using MQTTnet.Diagnostics; + +namespace MQTTnet.Server.Logging +{ + public class MqttNetChildLoggerWrapper : IMqttNetChildLogger + { + private readonly MqttNetLoggerWrapper _logger; + private readonly string _source; + + public MqttNetChildLoggerWrapper(string source, MqttNetLoggerWrapper logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _source = source; + } + + public IMqttNetChildLogger CreateChildLogger(string source = null) + { + return _logger.CreateChildLogger(source); + } + + public void Verbose(string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Verbose, _source, message, parameters, null); + } + + public void Info(string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Info, _source, message, parameters, null); + } + + public void Warning(Exception exception, string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Warning, _source, message, parameters, exception); + } + + public void Error(Exception exception, string message, params object[] parameters) + { + _logger.Publish(MqttNetLogLevel.Error, _source, message, parameters, exception); + } + } +} diff --git a/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs new file mode 100644 index 0000000..2623451 --- /dev/null +++ b/Source/MQTTnet.Server/Logging/MqttNetLoggerWrapper.cs @@ -0,0 +1,48 @@ +using System; +using Microsoft.Extensions.Logging; +using MQTTnet.Diagnostics; + +namespace MQTTnet.Server.Logging +{ + public class MqttNetLoggerWrapper : IMqttNetLogger + { + private readonly ILogger _logger; + + public MqttNetLoggerWrapper(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public event EventHandler LogMessagePublished; + + public IMqttNetChildLogger CreateChildLogger(string source = null) + { + return new MqttNetChildLoggerWrapper(source, this); + } + + public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception) + { + var convertedLogLevel = ConvertLogLevel(logLevel); + + if (!_logger.IsEnabled(convertedLogLevel)) + { + return; + } + + _logger.Log(convertedLogLevel, exception, message, parameters); + } + + private static LogLevel ConvertLogLevel(MqttNetLogLevel logLevel) + { + switch (logLevel) + { + case MqttNetLogLevel.Error: return LogLevel.Error; + case MqttNetLogLevel.Warning: return LogLevel.Warning; + case MqttNetLogLevel.Info: return LogLevel.Information; + case MqttNetLogLevel.Verbose: return LogLevel.Trace; + } + + return LogLevel.Debug; + } + } +} diff --git a/Source/MQTTnetServer/MQTTnetServer.csproj b/Source/MQTTnet.Server/MQTTnet.Server.csproj similarity index 55% rename from Source/MQTTnetServer/MQTTnetServer.csproj rename to Source/MQTTnet.Server/MQTTnet.Server.csproj index 2dbf608..f142e23 100644 --- a/Source/MQTTnetServer/MQTTnetServer.csproj +++ b/Source/MQTTnet.Server/MQTTnet.Server.csproj @@ -3,11 +3,11 @@ netcoreapp2.2 InProcess - MQTTnetServer - MQTTnetServer + MQTTnet.Server + MQTTnet.Server False - + MQTTnet @@ -16,8 +16,12 @@ latest + + + + - MQTTnetServer.xml + NU1605 1701;1702 @@ -29,8 +33,12 @@ + + + + @@ -38,4 +46,16 @@ + + + Always + + + Always + + + Always + + + diff --git a/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs new file mode 100644 index 0000000..4af2a55 --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttApplicationMessageInterceptor.cs @@ -0,0 +1,50 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Protocol; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttApplicationMessageInterceptor : IMqttServerApplicationMessageInterceptor + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttApplicationMessageInterceptor(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task InterceptApplicationMessagePublishAsync(MqttApplicationMessageInterceptorContext context) + { + try + { + var pythonContext = new PythonDictionary + { + { "accept_publish", context.AcceptPublish }, + { "close_connection", context.CloseConnection }, + { "client_id", context.ClientId }, + { "topic", context.ApplicationMessage.Topic }, + { "qos", (int)context.ApplicationMessage.QualityOfServiceLevel }, + { "retain", context.ApplicationMessage.Retain } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_intercept_application_message", pythonContext); + + context.AcceptPublish = (bool)pythonContext.get("accept_publish", context.AcceptPublish); + context.CloseConnection = (bool)pythonContext.get("close_connection", context.CloseConnection); + context.ApplicationMessage.Topic = (string)pythonContext.get("topic", context.ApplicationMessage.Topic); + context.ApplicationMessage.QualityOfServiceLevel = (MqttQualityOfServiceLevel)(int)pythonContext.get("qos", (int)context.ApplicationMessage.QualityOfServiceLevel); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while intercepting application message."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs b/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs new file mode 100644 index 0000000..0471b47 --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttConnectionValidator.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Protocol; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttConnectionValidator : IMqttServerConnectionValidator + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttConnectionValidator(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task ValidateConnectionAsync(MqttConnectionValidatorContext context) + { + try + { + var pythonContext = new PythonDictionary + { + { "client_id", context.ClientId }, + { "endpoint", context.Endpoint }, + { "username", context.Username }, + { "password", context.Password }, + { "result", PythonConvert.Pythonfy(context.ReturnCode) } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_validate_client_connection", pythonContext); + + context.ReturnCode = PythonConvert.ParseEnum((string)pythonContext["result"]); + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while validating client connection."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttServerService.cs b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs new file mode 100644 index 0000000..6588ab5 --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttServerService.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Adapter; +using MQTTnet.AspNetCore; +using MQTTnet.Implementations; +using MQTTnet.Protocol; +using MQTTnet.Server.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttServerService + { + private readonly ILogger _logger; + + private readonly MqttConnectionValidator _mqttConnectionValidator; + private readonly MqttSubscriptionInterceptor _mqttSubscriptionInterceptor; + private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor; + private readonly PythonScriptHostService _pythonScriptHostService; + + private readonly IMqttServer _mqttServer; + + public MqttServerService( + IMqttServerFactory mqttServerFactory, + MqttWebSocketServerAdapter webSocketServerAdapter, + MqttNetLoggerWrapper mqttNetLogger, + MqttConnectionValidator mqttConnectionValidator, + MqttSubscriptionInterceptor mqttSubscriptionInterceptor, + MqttApplicationMessageInterceptor mqttApplicationMessageInterceptor, + PythonScriptHostService pythonScriptHostService, + ILogger logger) + { + _mqttConnectionValidator = mqttConnectionValidator ?? throw new ArgumentNullException(nameof(mqttConnectionValidator)); + _mqttSubscriptionInterceptor = mqttSubscriptionInterceptor ?? throw new ArgumentNullException(nameof(mqttSubscriptionInterceptor)); + _mqttApplicationMessageInterceptor = mqttApplicationMessageInterceptor ?? throw new ArgumentNullException(nameof(mqttApplicationMessageInterceptor)); + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + var adapters = new List + { + new MqttTcpServerAdapter(new MqttNetChildLoggerWrapper(null, mqttNetLogger)), + webSocketServerAdapter + }; + + _mqttServer = mqttServerFactory.CreateMqttServer(adapters); + } + + public void Configure() + { + _pythonScriptHostService.RegisterProxyObject("publish", new Action(Publish)); + + var options = new MqttServerOptionsBuilder() + .WithDefaultEndpoint() + .WithDefaultEndpointPort(1883) + .WithConnectionValidator(_mqttConnectionValidator) + .WithApplicationMessageInterceptor(_mqttApplicationMessageInterceptor) + .WithSubscriptionInterceptor(_mqttSubscriptionInterceptor) + .Build(); + + _mqttServer.StartAsync(options).GetAwaiter().GetResult(); + + _logger.LogInformation("MQTT server started."); + } + + private void Publish(PythonDictionary parameters) + { + var applicationMessageBuilder = new MqttApplicationMessageBuilder() + .WithTopic((string)parameters.get("topic", null)) + .WithRetainFlag((bool)parameters.get("retain", false)) + .WithQualityOfServiceLevel((MqttQualityOfServiceLevel)(int)parameters.get("qos", 0)); + + var payload = parameters.get("payload", null); + var binaryPayload = new byte[0]; + + if (payload is string stringPayload) + { + binaryPayload = Encoding.UTF8.GetBytes(stringPayload); + } + else if (payload is ByteArray byteArray) + { + binaryPayload = byteArray.ToArray(); + } + else if (payload is IEnumerable intArray) + { + binaryPayload = intArray.Select(Convert.ToByte).ToArray(); + } + + applicationMessageBuilder = applicationMessageBuilder + .WithPayload(binaryPayload); + + var applicationMessage = applicationMessageBuilder.Build(); + + _mqttServer.PublishAsync(applicationMessage).GetAwaiter().GetResult(); + _logger.LogInformation($"Published topic '{applicationMessage.Topic}' from server."); + } + } +} diff --git a/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs new file mode 100644 index 0000000..115e0ab --- /dev/null +++ b/Source/MQTTnet.Server/Mqtt/MqttSubscriptionInterceptor.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading.Tasks; +using IronPython.Runtime; +using Microsoft.Extensions.Logging; +using MQTTnet.Server.Scripting; + +namespace MQTTnet.Server.Mqtt +{ + public class MqttSubscriptionInterceptor : IMqttServerSubscriptionInterceptor + { + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public MqttSubscriptionInterceptor(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task InterceptSubscriptionAsync(MqttSubscriptionInterceptorContext context) + { + try + { + var pythonContext = new PythonDictionary + { + { "accept_subscription", context.AcceptSubscription }, + { "close_connection", context.CloseConnection }, + + { "client_id", context.ClientId }, + { "topic", context.TopicFilter.Topic }, + { "qos", (int)context.TopicFilter.QualityOfServiceLevel } + }; + + _pythonScriptHostService.InvokeOptionalFunction("on_intercept_subscription", pythonContext); + + context.AcceptSubscription = (bool)pythonContext["accept_subscription"]; + context.CloseConnection = (bool)pythonContext["close_connection"]; + } + catch (Exception exception) + { + _logger.LogError(exception, "Error while intercepting subscription."); + } + + return Task.CompletedTask; + } + } +} diff --git a/Source/MQTTnet.Server/Program.cs b/Source/MQTTnet.Server/Program.cs new file mode 100644 index 0000000..a9b3be6 --- /dev/null +++ b/Source/MQTTnet.Server/Program.cs @@ -0,0 +1,143 @@ +using System; +using System.Collections.Generic; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using MQTTnet.Server.Configuration; + +namespace MQTTnet.Server +{ + public static class Program + { + public static int Main(string[] args) + { + try + { + PrintLogo(); + + CreateWebHostBuilder(args).Build().Run(); + return 0; + } + catch (Exception exception) + { + Console.WriteLine(exception); + return -1; + } + } + + private static void PrintLogo() + { + Console.ForegroundColor = ConsoleColor.Red; + + Console.WriteLine(@" + __ __ ____ _______ _______ _ _____ + | \/ |/ __ \__ __|__ __| | | / ____| + | \ / | | | | | | | |_ __ ___| |_ | (___ ___ _ ____ _____ _ __ + | |\/| | | | | | | | | '_ \ / _ \ __| \___ \ / _ \ '__\ \ / / _ \ '__| + | | | | |__| | | | | | | | | __/ |_ ____) | __/ | \ V / __/ | + |_| |_|\___\_\ |_| |_|_| |_|\___|\__| |_____/ \___|_| \_/ \___|_|"); + + Console.ForegroundColor = ConsoleColor.White; + Console.WriteLine(@" + -- The official MQTT server implementation of MQTTnet -- + Copyright (c) 2017-2019 The MQTTnet Team"); + + Console.ForegroundColor = ConsoleColor.Blue; + Console.WriteLine(@" + https://github.com/chkr1011/MQTTnet"); + + Console.ForegroundColor = ConsoleColor.White; + Console.WriteLine(@" + Version: 1.0.0-alpha1 + License: MIT (read LICENSE file) +"); + + Console.BackgroundColor = ConsoleColor.White; + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine(" ! THIS IS AN ALPHA VERSION! IT IS NOT RECOMMENDED TO USE IT FOR ANY DIFFERENT PURPOSE THAN TESTING OR EVALUATING!"); + Console.WriteLine(); + } + + private static IWebHostBuilder CreateWebHostBuilder(string[] args) + { + var webHost = WebHost.CreateDefaultBuilder(args) + .UseKestrel(kestrelOptions => + { + kestrelOptions.ListenAnyIP(80, listenOptions => + { + listenOptions.NoDelay = true; + }); + }); + + //var listen = ReadListenSettings(); + //webHost + // .UseKestrel(o => + // { + // if (listen?.Length > 0) + // { + // foreach (var item in listen) + // { + // if (item.Address?.Trim() == "*") + // { + // if (item.Protocol == ListenProtocolTypes.MQTT) + // { + // o.ListenAnyIP(item.Port, c => c.UseMqtt()); + // } + // else + // { + // o.ListenAnyIP(item.Port); + // } + // } + // else if (item.Address?.Trim() == "localhost") + // { + // if (item.Protocol == ListenProtocolTypes.MQTT) + // { + // o.ListenLocalhost(item.Port, c => c.UseMqtt()); + // } + // else + // { + // o.ListenLocalhost(item.Port); + // } + // } + // else + // { + // if (IPAddress.TryParse(item.Address, out var ip)) + // { + // if (item.Protocol == ListenProtocolTypes.MQTT) + // { + // o.Listen(ip, item.Port, c => c.UseMqtt()); + // } + // else + // { + // o.Listen(ip, item.Port); + // } + // } + // } + // } + // } + // else + // { + // o.ListenAnyIP(1883, l => l.UseMqtt()); + // o.ListenAnyIP(5000); + // } + // }); + + webHost.UseStartup(); + + return webHost; + } + + public static ListenModel[] ReadListenSettings() + { + var builder = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables() + .Build(); + + var listen = new List(); + builder.Bind("MQTTnetServer:Listen", listen); + + return listen.ToArray(); + } + } +} diff --git a/Source/MQTTnet.Server/Scripting/DataSharing/DataSharingService.cs b/Source/MQTTnet.Server/Scripting/DataSharing/DataSharingService.cs new file mode 100644 index 0000000..41b6854 --- /dev/null +++ b/Source/MQTTnet.Server/Scripting/DataSharing/DataSharingService.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Logging; + +namespace MQTTnet.Server.Scripting.DataSharing +{ + public class DataSharingService + { + private readonly Dictionary _storage = new Dictionary(); + private readonly PythonScriptHostService _pythonScriptHostService; + private readonly ILogger _logger; + + public DataSharingService(PythonScriptHostService pythonScriptHostService, ILogger logger) + { + _pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public void Configure() + { + _pythonScriptHostService.RegisterProxyObject("write_shared_data", new Action(Write)); + _pythonScriptHostService.RegisterProxyObject("read_shared_data", new Func(Read)); + } + + public void Write(string key, object value) + { + lock (_storage) + { + _storage[key] = value; + _logger.LogInformation($"Shared data with key '{key}' updated."); + } + } + + public object Read(string key, object defaultValue) + { + lock (_storage) + { + if (!_storage.TryGetValue(key, out var value)) + { + return defaultValue; + } + + return value; + } + } + } +} diff --git a/Source/MQTTnet.Server/Scripting/PythonConvert.cs b/Source/MQTTnet.Server/Scripting/PythonConvert.cs new file mode 100644 index 0000000..532b691 --- /dev/null +++ b/Source/MQTTnet.Server/Scripting/PythonConvert.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections; +using System.Text; +using IronPython.Runtime; + +namespace MQTTnet.Server.Scripting +{ + public static class PythonConvert + { + public static object ToPython(object value) + { + if (value is PythonDictionary) + { + return value; + } + + if (value is string) + { + return value; + } + + if (value is int) + { + return value; + } + + if (value is float) + { + return value; + } + + if (value is bool) + { + return value; + } + + if (value is IDictionary dictionary) + { + var pythonDictionary = new PythonDictionary(); + foreach (DictionaryEntry dictionaryEntry in dictionary) + { + pythonDictionary.Add(dictionaryEntry.Key, ToPython(dictionaryEntry.Value)); + } + + return pythonDictionary; + } + + if (value is IEnumerable enumerable) + { + var pythonList = new List(); + foreach (var item in enumerable) + { + pythonList.Add(ToPython(item)); + } + + return pythonList; + } + + return value; + } + + public static string Pythonfy(Enum value) + { + return Pythonfy(value.ToString()); + } + + public static string Pythonfy(string value) + { + var result = new StringBuilder(); + foreach (var @char in value) + { + if (char.IsUpper(@char) && result.Length > 0) + { + result.Append('_'); + } + + result.Append(char.ToLowerInvariant(@char)); + } + + return result.ToString(); + } + + public static TEnum ParseEnum(string value) where TEnum : Enum + { + return (TEnum)Enum.Parse(typeof(TEnum), value.Replace("_", string.Empty), true); + } + } +} diff --git a/Source/MQTTnet.Server/Scripting/PythonIOStream.cs b/Source/MQTTnet.Server/Scripting/PythonIOStream.cs new file mode 100644 index 0000000..b346e89 --- /dev/null +++ b/Source/MQTTnet.Server/Scripting/PythonIOStream.cs @@ -0,0 +1,66 @@ +using System; +using System.IO; +using System.Text; +using Microsoft.Extensions.Logging; + +namespace MQTTnet.Server.Scripting +{ + public class PythonIOStream : Stream + { + private readonly ILogger _logger; + private readonly Encoding _encoder = Encoding.UTF8; + + public PythonIOStream(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + if (count == 0) + { + return; + } + + var text = _encoder.GetString(buffer, offset, count); + if (text.Equals(Environment.NewLine)) + { + return; + } + + _logger.LogDebug(text); + } + + public override bool CanRead { get; } = false; + public override bool CanSeek { get; } = false; + public override bool CanWrite { get; } = true; + public override long Length { get; } = 0L; + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + } +} diff --git a/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs b/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs new file mode 100644 index 0000000..4e8aff9 --- /dev/null +++ b/Source/MQTTnet.Server/Scripting/PythonScriptHostService.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Dynamic; +using System.IO; +using System.Linq; +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.Scripting; +using Microsoft.Scripting.Hosting; + +namespace MQTTnet.Server.Scripting +{ + public class PythonScriptHostService + { + private readonly IDictionary _proxyObjects = new ExpandoObject(); + private readonly List _scriptInstances = new List(); + private readonly ILogger _logger; + private readonly ScriptEngine _scriptEngine; + + public PythonScriptHostService(PythonIOStream pythonIOStream, ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + _scriptEngine = IronPython.Hosting.Python.CreateEngine(); + _scriptEngine.Runtime.IO.SetOutput(pythonIOStream, Encoding.UTF8); + } + + public void Configure() + { + AddSearchPaths(_scriptEngine); + + var scriptsDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Scripts"); + foreach (var filename in Directory.GetFiles(scriptsDirectory, "*.py", SearchOption.AllDirectories).OrderBy(file => file)) + { + TryInitializeScript(filename); + } + } + + public void RegisterProxyObject(string name, object action) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (action == null) throw new ArgumentNullException(nameof(action)); + + _proxyObjects.Add(name, action); + } + + public void InvokeOptionalFunction(string name, object parameters) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + + lock (_scriptInstances) + { + foreach (var pythonScriptInstance in _scriptInstances) + { + try + { + pythonScriptInstance.InvokeOptionalFunction(name, parameters); + } + catch (Exception exception) + { + _logger.LogError(exception, $"Error while invoking function '{name}' at script '{pythonScriptInstance.Name}'."); + } + } + } + } + + private void TryInitializeScript(string filename) + { + try + { + var scriptName = new FileInfo(filename).Name; + + _logger.LogTrace($"Initializing Python script '{scriptName}'..."); + var code = File.ReadAllText(filename); + + var scriptInstance = CreateScriptInstance(scriptName, code); + scriptInstance.InvokeOptionalFunction("initialize"); + + _scriptInstances.Add(scriptInstance); + + _logger.LogInformation($"Initialized script '{scriptName}'."); + } + catch (Exception exception) + { + _logger.LogError(exception, $"Error while initializing script '{new FileInfo(filename).Name}'."); + } + } + + private PythonScriptInstance CreateScriptInstance(string name, string code) + { + var scriptScope = _scriptEngine.CreateScope(); + + var source = scriptScope.Engine.CreateScriptSourceFromString(code, SourceCodeKind.File); + var compiledCode = source.Compile(); + + scriptScope.SetVariable("mqtt_net_server", _proxyObjects); + compiledCode.Execute(scriptScope); + + return new PythonScriptInstance(name, scriptScope); + } + + private void AddSearchPaths(ScriptEngine scriptEngine) + { + var paths = new List + { + Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Lib"), + "/usr/lib/python2.7", + @"C:\Python27\Lib" + }; + + AddSearchPaths(scriptEngine, paths); + } + + private void AddSearchPaths(ScriptEngine scriptEngine, IEnumerable paths) + { + var searchPaths = scriptEngine.GetSearchPaths(); + + foreach (var path in paths) + { + if (Directory.Exists(path)) + { + searchPaths.Add(path); + _logger.LogInformation($"Added Python lib path: {path}"); + } + } + + scriptEngine.SetSearchPaths(searchPaths); + } + } +} diff --git a/Source/MQTTnet.Server/Scripting/PythonScriptInstance.cs b/Source/MQTTnet.Server/Scripting/PythonScriptInstance.cs new file mode 100644 index 0000000..91a4634 --- /dev/null +++ b/Source/MQTTnet.Server/Scripting/PythonScriptInstance.cs @@ -0,0 +1,57 @@ +using System; +using IronPython.Runtime; +using Microsoft.Scripting.Hosting; + +namespace MQTTnet.Server.Scripting +{ + public class PythonScriptInstance + { + private readonly ScriptScope _scriptScope; + + public PythonScriptInstance(string name, ScriptScope scriptScope) + { + _scriptScope = scriptScope; + + Name = name; + } + + public string Name { get; } + + public bool InvokeOptionalFunction(string name, params object[] parameters) + { + return InvokeOptionalFunction(name, parameters, out _); + } + + public bool InvokeOptionalFunction(string name, object[] parameters, out object result) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + + lock (_scriptScope) + { + if (!_scriptScope.Engine.Operations.TryGetMember(_scriptScope, name, out var member)) + { + result = null; + return false; + } + + if (!(member is PythonFunction function)) + { + throw new Exception($"Member '{name}' is no Python function."); + } + + try + { + result = _scriptScope.Engine.Operations.Invoke(function, parameters); + return true; + } + catch (Exception exception) + { + var details = _scriptScope.Engine.GetService().FormatException(exception); + var message = $"Error while invoking function '{name}'. " + Environment.NewLine + details; + + throw new Exception(message, exception); + } + } + } + } +} diff --git a/Source/MQTTnet.Server/Scripts/00_sample.py b/Source/MQTTnet.Server/Scripts/00_sample.py new file mode 100644 index 0000000..51763d8 --- /dev/null +++ b/Source/MQTTnet.Server/Scripts/00_sample.py @@ -0,0 +1,74 @@ +import json + + +def initialize(): + """ + This function is invoked after the script file has been loaded. + It will be executed only one time. + """ + + print("Hello World from Sample script.") + + +def on_validate_client_connection(context): + """ + This function is invoked whenever a client wants to connect. It can be used to validate the connection. + """ + + print(context) + + mqtt_net_server.write_shared_data(context["client_id"], {"custom_value_1": 1, "custom_value_2": True}) + + return + + if context["client_id"] != "test_client": + context["result"] = "connection_refused_not_authorized" + return + + if context["username"] != "bud spencer": + context["result"] = "connection_refused_not_authorized" + return + + if context["password"] != "secret": + context["result"] = "connection_refused_not_authorized" + + print(context) + + +def on_intercept_subscription(context): + """ + This function is invoked whenever a client wants to subscribe to a topic. + """ + + print("Client '{client_id}' want's to subscribe to topic '{topic}'.".format(client_id=context["client_id"], topic=context["topic"])) + + +def on_intercept_application_message(context): + """ + This function is invoked for every processed application message. It also allows modifying + the message or cancel processing at all. + """ + client_id = context["client_id"] + + if client_id != None: + shared_data = mqtt_net_server.read_shared_data(context["client_id"], {}) + print(shared_data) + + if context["topic"] == "topic_with_response": + + json_payload = { + "hello": "world", + "x": 1, + "y": True, + "z": None + } + + application_message = { + "retain": False, + "topic": "reply", + "payload": json.dumps(json_payload) + } + + mqtt_net_server.publish(application_message) + + print("Client '{client_id}' published topic '{topic}'.".format(client_id=context["client_id"], topic=context["topic"])) diff --git a/Source/MQTTnet.Server/Scripts/readme.md b/Source/MQTTnet.Server/Scripts/readme.md new file mode 100644 index 0000000..fb4cc99 --- /dev/null +++ b/Source/MQTTnet.Server/Scripts/readme.md @@ -0,0 +1,19 @@ +This directory contains scripts which are loaded by the server and can be used to perform the following tasks. + +1. Validation of client connections via credentials, client IDs etc. +2. Manipulation of every processed message. +3. Validation of subscription attempts. +4. Publishing of custom application messages. + +All scripts are loaded and _MQTTnet Server_ will invoke functions according to predefined naming conventions. +If a function is implemented in multiple scripts the context will be moved throug all instances. This allows overriding of results or passing data to other (following) scripts. + +The Python starndard library ships with _MQTTnet Server_. But it is possible to add custom paths with Python libraries. + +``` +import sys +sys.path.append(PATH_TO_LIBRARY) +``` + +* All scripts must have the file extension _.py_. +* All scripts are sorted alphabetically (A to Z) before being loaded and parsed. \ No newline at end of file diff --git a/Source/MQTTnet.Server/Startup.cs b/Source/MQTTnet.Server/Startup.cs new file mode 100644 index 0000000..ad7f40e --- /dev/null +++ b/Source/MQTTnet.Server/Startup.cs @@ -0,0 +1,69 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using MQTTnet.AspNetCore; +using MQTTnet.Server.Logging; +using MQTTnet.Server.Mqtt; +using MQTTnet.Server.Scripting; +using MQTTnet.Server.Scripting.DataSharing; + +namespace MQTTnet.Server +{ + public class Startup + { + public Startup(IConfiguration configuration) + { + //var builder = new ConfigurationBuilder() + // .AddJsonFile("appsettings.json") + // .AddEnvironmentVariables(); + + //Configuration = builder.Build(); + } + + public IConfigurationRoot Configuration { get; } + + public void Configure( + IApplicationBuilder application, + IHostingEnvironment environment, + MqttServerService mqttServerService, + PythonScriptHostService pythonScriptHostService, + DataSharingService dataSharingService) + { + if (environment.IsDevelopment()) + { + application.UseDeveloperExceptionPage(); + } + else + { + application.UseHsts(); + } + + application.UseHttpsRedirection(); + application.UseMvc(); + + dataSharingService.Configure(); + pythonScriptHostService.Configure(); + mqttServerService.Configure(); + } + + public void ConfigureServices(IServiceCollection services) + { + services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); + } + } +} \ No newline at end of file diff --git a/Source/MQTTnetServer/appsettings.Development.json b/Source/MQTTnet.Server/appsettings.Development.json similarity index 100% rename from Source/MQTTnetServer/appsettings.Development.json rename to Source/MQTTnet.Server/appsettings.Development.json diff --git a/Source/MQTTnetServer/appsettings.json b/Source/MQTTnet.Server/appsettings.json similarity index 100% rename from Source/MQTTnetServer/appsettings.json rename to Source/MQTTnet.Server/appsettings.json diff --git a/Source/MQTTnet/MqttFactory.cs b/Source/MQTTnet/MqttFactory.cs index 3f1e1ac..64725e2 100644 --- a/Source/MQTTnet/MqttFactory.cs +++ b/Source/MQTTnet/MqttFactory.cs @@ -10,6 +10,17 @@ namespace MQTTnet { public class MqttFactory : IMqttClientFactory, IMqttServerFactory { + private readonly IMqttNetLogger _logger; + + public MqttFactory() : this(new MqttNetLogger()) + { + } + + public MqttFactory(IMqttNetLogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + public IMqttClient CreateMqttClient() { return CreateMqttClient(new MqttNetLogger()); @@ -57,5 +68,12 @@ namespace MQTTnet return new MqttServer(adapters, logger.CreateChildLogger()); } + + public IMqttServer CreateMqttServer(IEnumerable adapters) + { + if (adapters == null) throw new ArgumentNullException(nameof(adapters)); + + return new MqttServer(adapters, _logger.CreateChildLogger()); + } } } \ No newline at end of file diff --git a/Source/MQTTnetServer/MQTTnetServer.xml b/Source/MQTTnetServer/MQTTnetServer.xml deleted file mode 100644 index 762bc40..0000000 --- a/Source/MQTTnetServer/MQTTnetServer.xml +++ /dev/null @@ -1,116 +0,0 @@ - - - - MQTTnetServer - - - - - Main Entry point - - - - - Main - - - - - - Configure and Start Kestrel - - - - - - - Read Application Settings - - - - - - Listen Entry Settings Model - - - - - Constructor - - - - - Listen Address - - - - - Listen Port - - - - - Protocol Type - - - - - Listen Protocol Types - - - - - HTTP - - - - - HTTPS - - - - - MQTT - - - - - Main Settings Model - - - - - Listen Settings - - - - - Web App Startup - - - - - Constructor - - - - - - Application Settings - - - - - This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - - - - - - - This method gets called by the runtime. Use this method to add services to the container. - - - - - diff --git a/Source/MQTTnetServer/Program.cs b/Source/MQTTnetServer/Program.cs deleted file mode 100644 index cdcb35d..0000000 --- a/Source/MQTTnetServer/Program.cs +++ /dev/null @@ -1,118 +0,0 @@ -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using MQTTnet.AspNetCore; -using MQTTnetServer.Settings; -using System; -using System.Collections.Generic; -using System.IO; -using System.Net; - -namespace MQTTnetServer -{ - /// - /// Main Entry point - /// - public class Program - { - /// - /// Main - /// - /// - public static void Main(string[] args) - { - try - { - CreateWebHostBuilder(args).Build().Run(); - } - catch (FileNotFoundException e) - { - Console.WriteLine("Could not find application settings file in: " + e.FileName); - return; - } - } - - /// - /// Configure and Start Kestrel - /// - /// - /// - public static IWebHostBuilder CreateWebHostBuilder(string[] args) - { - var webHost = WebHost.CreateDefaultBuilder(args); - var listen = ReadListenSettings(); - webHost - .UseKestrel(o => - { - if (listen?.Length > 0) - { - foreach (var item in listen) - { - if (item.Address?.Trim() == "*") - { - if (item.Protocol == ListenProtocolTypes.MQTT) - { - o.ListenAnyIP(item.Port, c => c.UseMqtt()); - } - else - { - o.ListenAnyIP(item.Port); - } - } - else if (item.Address?.Trim() == "localhost") - { - if (item.Protocol == ListenProtocolTypes.MQTT) - { - o.ListenLocalhost(item.Port, c => c.UseMqtt()); - } - else - { - o.ListenLocalhost(item.Port); - } - } - else - { - if (IPAddress.TryParse(item.Address, out var ip)) - { - if (item.Protocol == ListenProtocolTypes.MQTT) - { - o.Listen(ip, item.Port, c => c.UseMqtt()); - } - else - { - o.Listen(ip, item.Port); - } - } - } - } - } - else - { - o.ListenAnyIP(1883, l => l.UseMqtt()); - o.ListenAnyIP(5000); - } - }); - - webHost.UseStartup(); - - return webHost; - } - - /// - /// Read Application Settings - /// - /// - public static ListenModel[] ReadListenSettings() - { - var builder = new ConfigurationBuilder() - .AddJsonFile("appsettings.json") - .AddEnvironmentVariables() - .Build(); - - var listen = new List(); - builder.Bind("MQTTnetServer:Listen", listen); - - return listen.ToArray(); - } - } -} diff --git a/Source/MQTTnetServer/Startup.cs b/Source/MQTTnetServer/Startup.cs deleted file mode 100644 index 4c02756..0000000 --- a/Source/MQTTnetServer/Startup.cs +++ /dev/null @@ -1,61 +0,0 @@ -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace MQTTnetServer -{ - /// - /// Web App Startup - /// - public class Startup - { - /// - /// Constructor - /// - /// - public Startup(IConfiguration configuration) - { - var builder = new ConfigurationBuilder() - .AddJsonFile("appsettings.json") - .AddEnvironmentVariables(); - Configuration = builder.Build(); - } - - /// - /// Application Settings - /// - public IConfigurationRoot Configuration { get; } - - /// - /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - /// - /// - /// - public void Configure(IApplicationBuilder app, IHostingEnvironment env) - { - if (env.IsDevelopment()) - { - app.UseDeveloperExceptionPage(); - } - else - { - // The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts. - app.UseHsts(); - } - - app.UseHttpsRedirection(); - app.UseMvc(); - } - - /// - /// This method gets called by the runtime. Use this method to add services to the container. - /// - /// - public void ConfigureServices(IServiceCollection services) - { - services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); - } - } -} \ No newline at end of file