@@ -1,7 +1,7 @@ | |||
| |||
Microsoft Visual Studio Solution File, Format Version 12.00 | |||
# Visual Studio Version 16 | |||
VisualStudioVersion = 16.0.28729.10 | |||
# Visual Studio 15 | |||
VisualStudioVersion = 15.0.28307.645 | |||
MinimumVisualStudioVersion = 10.0.40219.1 | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" | |||
EndProject | |||
@@ -1,9 +1,9 @@ | |||
namespace MQTTnet.Server.Configuration | |||
{ | |||
/// <summary> | |||
/// Main Settings Model | |||
/// MQTT settings Model | |||
/// </summary> | |||
public class SettingsModel | |||
public class MqttSettingsModel | |||
{ | |||
/// <summary> | |||
/// Set default connection timeout in seconds |
@@ -6,6 +6,6 @@ | |||
public int WriteInterval { get; set; } = 10; | |||
public string Filename { get; set; } = "RetainedApplicationMessages.json"; | |||
public string Path { get; set; } | |||
} | |||
} |
@@ -0,0 +1,9 @@ | |||
using System.Collections.Generic; | |||
namespace MQTTnet.Server.Configuration | |||
{ | |||
public class ScriptingSettingsModel | |||
{ | |||
public List<string> IncludePaths { get; set; } | |||
} | |||
} |
@@ -4,12 +4,14 @@ using System.Linq; | |||
using System.Net; | |||
using System.Threading.Tasks; | |||
using System.Web; | |||
using Microsoft.AspNetCore.Authorization; | |||
using Microsoft.AspNetCore.Mvc; | |||
using MQTTnet.Server.Mqtt; | |||
using MQTTnet.Server.Status; | |||
namespace MQTTnet.Server.Controllers | |||
{ | |||
[Authorize] | |||
[ApiController] | |||
public class ClientsController : ControllerBase | |||
{ | |||
@@ -57,5 +59,21 @@ namespace MQTTnet.Server.Controllers | |||
await client.DisconnectAsync(); | |||
return StatusCode((int)HttpStatusCode.NoContent); | |||
} | |||
[Route("api/v1/clients/{clientId}/statistics")] | |||
[HttpDelete] | |||
public async Task<ActionResult> DeleteClientStatistics(string clientId) | |||
{ | |||
clientId = HttpUtility.UrlDecode(clientId); | |||
var client = (await _mqttServerService.GetClientStatusAsync()).FirstOrDefault(c => c.ClientId == clientId); | |||
if (client == null) | |||
{ | |||
return new StatusCodeResult((int)HttpStatusCode.NotFound); | |||
} | |||
client.ResetStatistics(); | |||
return StatusCode((int)HttpStatusCode.NoContent); | |||
} | |||
} | |||
} |
@@ -4,11 +4,13 @@ using System.Linq; | |||
using System.Net; | |||
using System.Threading.Tasks; | |||
using System.Web; | |||
using Microsoft.AspNetCore.Authorization; | |||
using Microsoft.AspNetCore.Mvc; | |||
using MQTTnet.Server.Mqtt; | |||
namespace MQTTnet.Server.Controllers | |||
{ | |||
[Authorize] | |||
[ApiController] | |||
public class RetainedApplicationMessagesController : ControllerBase | |||
{ | |||
@@ -1,11 +1,65 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Microsoft.AspNetCore.Authorization; | |||
using Microsoft.AspNetCore.Mvc; | |||
using MQTTnet.Server.Scripting; | |||
using MQTTnet.Server.Web; | |||
namespace MQTTnet.Server.Controllers | |||
{ | |||
public class ScriptsController | |||
[Authorize] | |||
[ApiController] | |||
public class ScriptsController : Controller | |||
{ | |||
private readonly PythonScriptHostService _pythonScriptHostService; | |||
public ScriptsController(PythonScriptHostService pythonScriptHostService) | |||
{ | |||
_pythonScriptHostService = pythonScriptHostService ?? throw new ArgumentNullException(nameof(pythonScriptHostService)); | |||
} | |||
[Route("api/v1/scripts")] | |||
[HttpGet] | |||
public ActionResult<List<string>> GetScriptUids() | |||
{ | |||
return _pythonScriptHostService.GetScriptUids(); | |||
} | |||
[Route("api/v1/scripts/uid")] | |||
[HttpGet] | |||
public async Task<ActionResult<string>> GetScript(string uid) | |||
{ | |||
var script = await _pythonScriptHostService.ReadScriptAsync(uid, HttpContext.RequestAborted); | |||
if (script == null) | |||
{ | |||
return NotFound(); | |||
} | |||
return Content(script); | |||
} | |||
[Route("api/v1/scripts/uid")] | |||
[HttpPost] | |||
public Task PostScript(string uid) | |||
{ | |||
var code = HttpContext.Request.ReadBodyAsString(); | |||
return _pythonScriptHostService.WriteScriptAsync(uid, code, CancellationToken.None); | |||
} | |||
[Route("api/v1/scripts/uid")] | |||
[HttpDelete] | |||
public Task DeleteScript(string uid) | |||
{ | |||
return _pythonScriptHostService.DeleteScriptAsync(uid); | |||
} | |||
[Route("api/v1/scripts/initialize")] | |||
[HttpPost] | |||
public Task PostInitializeScripts() | |||
{ | |||
return _pythonScriptHostService.TryInitializeScriptsAsync(); | |||
} | |||
} | |||
} |
@@ -4,12 +4,14 @@ using System.Linq; | |||
using System.Net; | |||
using System.Threading.Tasks; | |||
using System.Web; | |||
using Microsoft.AspNetCore.Authorization; | |||
using Microsoft.AspNetCore.Mvc; | |||
using MQTTnet.Server.Mqtt; | |||
using MQTTnet.Server.Status; | |||
namespace MQTTnet.Server.Controllers | |||
{ | |||
[Authorize] | |||
[ApiController] | |||
public class SessionsController : ControllerBase | |||
{ | |||
@@ -20,6 +20,7 @@ | |||
<PropertyGroup> | |||
<StartupObject></StartupObject> | |||
<UserSecretsId>c564f0de-28b4-45bf-b726-4d665d705653</UserSecretsId> | |||
</PropertyGroup> | |||
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> | |||
@@ -28,6 +29,13 @@ | |||
<NoWarn>1701;1702</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<Compile Remove="wwwroot\**" /> | |||
<Content Remove="wwwroot\**" /> | |||
<EmbeddedResource Remove="wwwroot\**" /> | |||
<None Remove="wwwroot\**" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<Content Update="appsettings.json"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
@@ -54,23 +62,23 @@ | |||
<None Update="LICENSE"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
<None Update="run.sh"> | |||
<None Update="README.md"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
<None Update="run.bat"> | |||
<None Update="Scripts\README.md"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
<None Update="Scripts\readme.md"> | |||
<None Update="Scripts\00_sample.py"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
<None Update="Scripts\00_sample.py"> | |||
<None Update="Web\authorization_handler.py"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<Folder Include="Properties\PublishProfiles\" /> | |||
<Folder Include="wwwroot\" /> | |||
<Folder Include="Web\wwwroot\" /> | |||
</ItemGroup> | |||
</Project> |
@@ -12,7 +12,7 @@ namespace MQTTnet.Server.Mqtt | |||
{ | |||
private readonly MqttFactory _mqttFactory; | |||
public CustomMqttFactory(SettingsModel settings, ILogger<MqttServer> logger) | |||
public CustomMqttFactory(MqttSettingsModel settings, ILogger<MqttServer> logger) | |||
{ | |||
if (settings == null) throw new ArgumentNullException(nameof(settings)); | |||
if (logger == null) throw new ArgumentNullException(nameof(logger)); | |||
@@ -23,7 +23,7 @@ namespace MQTTnet.Server.Mqtt | |||
{ | |||
private readonly ILogger<MqttServerService> _logger; | |||
private readonly SettingsModel _settings; | |||
private readonly MqttSettingsModel _settings; | |||
private readonly MqttApplicationMessageInterceptor _mqttApplicationMessageInterceptor; | |||
private readonly MqttServerStorage _mqttServerStorage; | |||
private readonly MqttClientConnectedHandler _mqttClientConnectedHandler; | |||
@@ -37,7 +37,7 @@ namespace MQTTnet.Server.Mqtt | |||
private readonly MqttWebSocketServerAdapter _webSocketServerAdapter; | |||
public MqttServerService( | |||
SettingsModel settings, | |||
MqttSettingsModel mqttSettings, | |||
CustomMqttFactory mqttFactory, | |||
MqttClientConnectedHandler mqttClientConnectedHandler, | |||
MqttClientDisconnectedHandler mqttClientDisconnectedHandler, | |||
@@ -50,7 +50,7 @@ namespace MQTTnet.Server.Mqtt | |||
PythonScriptHostService pythonScriptHostService, | |||
ILogger<MqttServerService> logger) | |||
{ | |||
_settings = settings ?? throw new ArgumentNullException(nameof(settings)); | |||
_settings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings)); | |||
_mqttClientConnectedHandler = mqttClientConnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientConnectedHandler)); | |||
_mqttClientDisconnectedHandler = mqttClientDisconnectedHandler ?? throw new ArgumentNullException(nameof(mqttClientDisconnectedHandler)); | |||
_mqttClientSubscribedTopicHandler = mqttClientSubscribedTopicHandler ?? throw new ArgumentNullException(nameof(mqttClientSubscribedTopicHandler)); | |||
@@ -14,27 +14,28 @@ namespace MQTTnet.Server.Mqtt | |||
{ | |||
private readonly List<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>(); | |||
private readonly SettingsModel _settings; | |||
private readonly MqttSettingsModel _mqttSettings; | |||
private readonly ILogger<MqttServerStorage> _logger; | |||
private string _filename; | |||
private string _path; | |||
private bool _messagesHaveChanged; | |||
public MqttServerStorage(SettingsModel settings, ILogger<MqttServerStorage> logger) | |||
public MqttServerStorage(MqttSettingsModel mqttSettings, ILogger<MqttServerStorage> logger) | |||
{ | |||
_settings = settings ?? throw new ArgumentNullException(nameof(settings)); | |||
_mqttSettings = mqttSettings ?? throw new ArgumentNullException(nameof(mqttSettings)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
} | |||
public void Configure() | |||
{ | |||
if (_settings.RetainedApplicationMessages?.Persist != true) | |||
if (_mqttSettings.RetainedApplicationMessages?.Persist != true || | |||
string.IsNullOrEmpty(_mqttSettings.RetainedApplicationMessages.Path)) | |||
{ | |||
_logger.LogInformation("Persisting of retained application messages is disabled."); | |||
return; | |||
} | |||
_filename = ExpandFilename(); | |||
_path = ExpandPath(); | |||
// The retained application messages are stored in a separate thread. | |||
// This is mandatory because writing them to a slow storage (like RaspberryPi SD card) | |||
@@ -61,7 +62,7 @@ namespace MQTTnet.Server.Mqtt | |||
{ | |||
try | |||
{ | |||
await Task.Delay(TimeSpan.FromSeconds(_settings.RetainedApplicationMessages.WriteInterval)).ConfigureAwait(false); | |||
await Task.Delay(TimeSpan.FromSeconds(_mqttSettings.RetainedApplicationMessages.WriteInterval)).ConfigureAwait(false); | |||
List<MqttApplicationMessage> messages; | |||
lock (_messages) | |||
@@ -76,7 +77,7 @@ namespace MQTTnet.Server.Mqtt | |||
} | |||
var json = JsonConvert.SerializeObject(messages); | |||
await File.WriteAllTextAsync(_filename, json, Encoding.UTF8).ConfigureAwait(false); | |||
await File.WriteAllTextAsync(_path, json, Encoding.UTF8).ConfigureAwait(false); | |||
_logger.LogInformation($"{messages.Count} retained MQTT messages written."); | |||
} | |||
@@ -89,19 +90,19 @@ namespace MQTTnet.Server.Mqtt | |||
public async Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() | |||
{ | |||
if (_settings.RetainedApplicationMessages?.Persist != true) | |||
if (_mqttSettings.RetainedApplicationMessages?.Persist != true) | |||
{ | |||
return null; | |||
} | |||
if (!File.Exists(_filename)) | |||
if (!File.Exists(_path)) | |||
{ | |||
return null; | |||
} | |||
try | |||
{ | |||
var json = await File.ReadAllTextAsync(_filename).ConfigureAwait(false); | |||
var json = await File.ReadAllTextAsync(_path).ConfigureAwait(false); | |||
var applicationMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json); | |||
_logger.LogInformation($"{applicationMessages.Count} retained MQTT messages loaded."); | |||
@@ -115,17 +116,17 @@ namespace MQTTnet.Server.Mqtt | |||
} | |||
} | |||
private string ExpandFilename() | |||
private string ExpandPath() | |||
{ | |||
var filename = _settings.RetainedApplicationMessages.Filename; | |||
var path = _mqttSettings.RetainedApplicationMessages.Path; | |||
var uri = new Uri(filename, UriKind.RelativeOrAbsolute); | |||
var uri = new Uri(path, UriKind.RelativeOrAbsolute); | |||
if (!uri.IsAbsoluteUri) | |||
{ | |||
filename = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, filename); | |||
path = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, path); | |||
} | |||
return filename; | |||
return path; | |||
} | |||
} | |||
} |
@@ -2,6 +2,7 @@ | |||
using System.Reflection; | |||
using Microsoft.AspNetCore; | |||
using Microsoft.AspNetCore.Hosting; | |||
using MQTTnet.Server.Web; | |||
namespace MQTTnet.Server | |||
{ | |||
@@ -0,0 +1,10 @@ | |||
# Starting portable version | |||
The portable version requires a local installation of the .net core runtime. With that runtime installed the server can be started via the following comand. | |||
dotnet .\MQTTnet.Server.dll | |||
# Starting self contained versions | |||
The self contained versions are fully portable versions including the .net core runtime. The server can be started using the contained executable files. | |||
Windows: MQTTnet.Server.exe | |||
Linux: MQTTnet.Server (must be set to _executable_ first) |
@@ -1 +0,0 @@ | |||
[{"Topic":"a","Payload":"YWFzYXNhc2E=","QualityOfServiceLevel":0,"Retain":true,"UserProperties":[],"ContentType":null,"ResponseTopic":null,"PayloadFormatIndicator":null,"MessageExpiryInterval":null,"TopicAlias":null,"CorrelationData":null,"SubscriptionIdentifier":null},{"Topic":"fgdfgd","Payload":"YWFzYXNhc2E=","QualityOfServiceLevel":0,"Retain":true,"UserProperties":[],"ContentType":null,"ResponseTopic":null,"PayloadFormatIndicator":null,"MessageExpiryInterval":null,"TopicAlias":null,"CorrelationData":null,"SubscriptionIdentifier":null}] |
@@ -4,9 +4,12 @@ using System.Dynamic; | |||
using System.IO; | |||
using System.Linq; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Scripting; | |||
using Microsoft.Scripting.Hosting; | |||
using MQTTnet.Server.Configuration; | |||
namespace MQTTnet.Server.Scripting | |||
{ | |||
@@ -14,11 +17,14 @@ namespace MQTTnet.Server.Scripting | |||
{ | |||
private readonly IDictionary<string, object> _proxyObjects = new ExpandoObject(); | |||
private readonly List<PythonScriptInstance> _scriptInstances = new List<PythonScriptInstance>(); | |||
private readonly string _scriptsPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Scripts"); | |||
private readonly ScriptingSettingsModel _scriptingSettings; | |||
private readonly ILogger<PythonScriptHostService> _logger; | |||
private readonly ScriptEngine _scriptEngine; | |||
public PythonScriptHostService(PythonIOStream pythonIOStream, ILogger<PythonScriptHostService> logger) | |||
public PythonScriptHostService(ScriptingSettingsModel scriptingSettings, PythonIOStream pythonIOStream, ILogger<PythonScriptHostService> logger) | |||
{ | |||
_scriptingSettings = scriptingSettings ?? throw new ArgumentNullException(nameof(scriptingSettings)); | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
_scriptEngine = IronPython.Hosting.Python.CreateEngine(); | |||
@@ -29,11 +35,7 @@ namespace MQTTnet.Server.Scripting | |||
{ | |||
AddSearchPaths(_scriptEngine); | |||
var scriptsDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Scripts"); | |||
foreach (var filename in Directory.GetFiles(scriptsDirectory, "*.py", SearchOption.AllDirectories).OrderBy(file => file)) | |||
{ | |||
TryInitializeScript(filename); | |||
} | |||
TryInitializeScriptsAsync().GetAwaiter().GetResult(); | |||
} | |||
public void RegisterProxyObject(string name, object @object) | |||
@@ -58,35 +60,109 @@ namespace MQTTnet.Server.Scripting | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.LogError(exception, $"Error while invoking function '{name}' at script '{pythonScriptInstance.Name}'."); | |||
_logger.LogError(exception, $"Error while invoking function '{name}' at script '{pythonScriptInstance.Uid}'."); | |||
} | |||
} | |||
} | |||
} | |||
private void TryInitializeScript(string filename) | |||
public List<string> GetScriptUids() | |||
{ | |||
try | |||
lock (_scriptInstances) | |||
{ | |||
var scriptName = new FileInfo(filename).Name; | |||
return _scriptInstances.Select(si => si.Uid).ToList(); | |||
} | |||
} | |||
_logger.LogTrace($"Initializing Python script '{scriptName}'..."); | |||
var code = File.ReadAllText(filename); | |||
public Task<string> ReadScriptAsync(string uid, CancellationToken cancellationToken) | |||
{ | |||
if (uid == null) throw new ArgumentNullException(nameof(uid)); | |||
var scriptInstance = CreateScriptInstance(scriptName, code); | |||
scriptInstance.InvokeOptionalFunction("initialize"); | |||
string path; | |||
_scriptInstances.Add(scriptInstance); | |||
lock (_scriptInstances) | |||
{ | |||
path = _scriptInstances.FirstOrDefault(si => si.Uid == uid)?.Path; | |||
} | |||
_logger.LogInformation($"Initialized script '{scriptName}'."); | |||
if (path == null || !File.Exists(path)) | |||
{ | |||
return null; | |||
} | |||
return File.ReadAllTextAsync(path, Encoding.UTF8, cancellationToken); | |||
} | |||
public async Task WriteScriptAsync(string uid, string code, CancellationToken cancellationToken) | |||
{ | |||
var path = Path.Combine(_scriptsPath, uid + ".py"); | |||
await File.WriteAllTextAsync(path, code, Encoding.UTF8, cancellationToken).ConfigureAwait(false); | |||
await TryInitializeScriptsAsync().ConfigureAwait(false); | |||
} | |||
public async Task DeleteScriptAsync(string uid) | |||
{ | |||
var path = Path.Combine(_scriptsPath, uid + ".py"); | |||
if (File.Exists(path)) | |||
{ | |||
File.Delete(path); | |||
await TryInitializeScriptsAsync().ConfigureAwait(false); | |||
} | |||
} | |||
public async Task TryInitializeScriptsAsync() | |||
{ | |||
lock (_scriptInstances) | |||
{ | |||
foreach (var scriptInstance in _scriptInstances) | |||
{ | |||
try | |||
{ | |||
scriptInstance.InvokeOptionalFunction("destroy"); | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.LogWarning(exception, $"Error while unloading script '{scriptInstance.Uid}'."); | |||
} | |||
} | |||
_scriptInstances.Clear(); | |||
} | |||
foreach (var path in Directory.GetFiles(_scriptsPath, "*.py", SearchOption.AllDirectories).OrderBy(file => file)) | |||
{ | |||
await TryInitializeScriptAsync(path).ConfigureAwait(false); | |||
} | |||
} | |||
private async Task TryInitializeScriptAsync(string path) | |||
{ | |||
var uid = new FileInfo(path).Name.Replace(".py", string.Empty, StringComparison.OrdinalIgnoreCase); | |||
try | |||
{ | |||
_logger.LogTrace($"Initializing Python script '{uid}'..."); | |||
var code = await File.ReadAllTextAsync(path).ConfigureAwait(false); | |||
var scriptInstance = CreateScriptInstance(uid, path, code); | |||
scriptInstance.InvokeOptionalFunction("initialize"); | |||
lock (_scriptInstances) | |||
{ | |||
_scriptInstances.Add(scriptInstance); | |||
} | |||
_logger.LogInformation($"Initialized script '{uid}'."); | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.LogError(exception, $"Error while initializing script '{new FileInfo(filename).Name}'."); | |||
_logger.LogError(exception, $"Error while initializing script '{uid}'."); | |||
} | |||
} | |||
private PythonScriptInstance CreateScriptInstance(string name, string code) | |||
private PythonScriptInstance CreateScriptInstance(string uid, string path, string code) | |||
{ | |||
var scriptScope = _scriptEngine.CreateScope(); | |||
@@ -96,31 +172,32 @@ namespace MQTTnet.Server.Scripting | |||
scriptScope.SetVariable("mqtt_net_server", _proxyObjects); | |||
compiledCode.Execute(scriptScope); | |||
return new PythonScriptInstance(name, scriptScope); | |||
return new PythonScriptInstance(uid, path, scriptScope); | |||
} | |||
private void AddSearchPaths(ScriptEngine scriptEngine) | |||
{ | |||
var paths = new List<string> | |||
if (_scriptingSettings.IncludePaths?.Any() != true) | |||
{ | |||
Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Lib"), | |||
"/usr/lib/python2.7", | |||
@"C:\Python27\Lib" | |||
}; | |||
AddSearchPaths(scriptEngine, paths); | |||
} | |||
return; | |||
} | |||
private void AddSearchPaths(ScriptEngine scriptEngine, IEnumerable<string> paths) | |||
{ | |||
var searchPaths = scriptEngine.GetSearchPaths(); | |||
foreach (var path in paths) | |||
foreach (var path in _scriptingSettings.IncludePaths) | |||
{ | |||
if (Directory.Exists(path)) | |||
var effectivePath = path; | |||
var uri = new Uri(effectivePath, UriKind.RelativeOrAbsolute); | |||
if (!uri.IsAbsoluteUri) | |||
{ | |||
effectivePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, effectivePath); | |||
} | |||
if (Directory.Exists(effectivePath)) | |||
{ | |||
searchPaths.Add(path); | |||
_logger.LogInformation($"Added Python lib path: {path}"); | |||
searchPaths.Add(effectivePath); | |||
_logger.LogInformation($"Added Python lib path: {effectivePath}"); | |||
} | |||
} | |||
@@ -8,14 +8,17 @@ namespace MQTTnet.Server.Scripting | |||
{ | |||
private readonly ScriptScope _scriptScope; | |||
public PythonScriptInstance(string name, ScriptScope scriptScope) | |||
public PythonScriptInstance(string uid, string path, ScriptScope scriptScope) | |||
{ | |||
_scriptScope = scriptScope; | |||
Uid = uid; | |||
Path = path; | |||
Name = name; | |||
_scriptScope = scriptScope; | |||
} | |||
public string Name { get; } | |||
public string Uid { get; } | |||
public string Path { get; } | |||
public bool InvokeOptionalFunction(string name, params object[] parameters) | |||
{ | |||
@@ -7,9 +7,17 @@ def initialize(): | |||
It will be executed only one time. | |||
""" | |||
print("Hello World from Sample script.") | |||
print("Hello World from sample script.") | |||
def destroy(): | |||
""" | |||
This function is invoked when the script is unloaded due to a script file update etc. | |||
""" | |||
print("Bye from sample script.") | |||
def on_validate_client_connection(context): | |||
""" | |||
This function is invoked whenever a client wants to connect. It can be used to validate the connection. | |||
@@ -1,9 +1,11 @@ | |||
This directory contains scripts which are loaded by the server and can be used to perform the following tasks. | |||
# MQTT Scripts | |||
1. Validation of client connections via credentials, client IDs etc. | |||
2. Manipulation of every processed message. | |||
3. Validation of subscription attempts. | |||
4. Publishing of custom application messages. | |||
This directory contains scripts which are loaded by the server and can be used to perform the following tasks. | |||
* Validation of client connections via credentials, client IDs etc. | |||
* Manipulation of every processed message. | |||
* Validation of subscription attempts. | |||
* Publishing of custom application messages. | |||
All scripts are loaded and _MQTTnet Server_ will invoke functions according to predefined naming conventions. | |||
If a function is implemented in multiple scripts the context will be moved throug all instances. This allows overriding of results or passing data to other (following) scripts. | |||
@@ -0,0 +1,111 @@ | |||
using System; | |||
using System.IO; | |||
using System.Net.Http.Headers; | |||
using System.Security.Claims; | |||
using System.Text; | |||
using System.Text.Encodings.Web; | |||
using System.Threading.Tasks; | |||
using IronPython.Runtime; | |||
using Microsoft.AspNetCore.Authentication; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using Microsoft.Net.Http.Headers; | |||
using Microsoft.Scripting; | |||
namespace MQTTnet.Server.Web | |||
{ | |||
public class AuthenticationHandler : AuthenticationHandler<AuthenticationSchemeOptions> | |||
{ | |||
private readonly ILogger<AuthenticationHandler> _logger; | |||
public AuthenticationHandler(IOptionsMonitor<AuthenticationSchemeOptions> options, ILoggerFactory loggerFactory, UrlEncoder encoder, ISystemClock clock, ILogger<AuthenticationHandler> logger) | |||
: base(options, loggerFactory, encoder, clock) | |||
{ | |||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |||
} | |||
protected override async Task<AuthenticateResult> HandleAuthenticateAsync() | |||
{ | |||
await Task.CompletedTask; | |||
if (!Request.Headers.ContainsKey(HeaderNames.Authorization)) | |||
{ | |||
return AuthenticateResult.Fail("Missing Authorization Header"); | |||
} | |||
try | |||
{ | |||
var headerValue = Request.Headers[HeaderNames.Authorization]; | |||
var parsedHeaderValue = AuthenticationHeaderValue.Parse(Request.Headers[HeaderNames.Authorization]); | |||
var scheme = parsedHeaderValue.Scheme; | |||
var parameter = parsedHeaderValue.Parameter; | |||
string username = null; | |||
string password = null; | |||
if (scheme.Equals("Basic", StringComparison.OrdinalIgnoreCase)) | |||
{ | |||
var credentialBytes = Convert.FromBase64String(parsedHeaderValue.Parameter); | |||
var credentials = Encoding.UTF8.GetString(credentialBytes).Split(':'); | |||
username = credentials[0]; | |||
password = credentials[1]; | |||
} | |||
var context = new PythonDictionary | |||
{ | |||
["header_value"] = headerValue, | |||
["scheme"] = scheme, | |||
["parameter"] = parameter, | |||
["username"] = username, | |||
["password"] = password, | |||
["is_authenticated"] = false | |||
}; | |||
if (!ValidateUser(context)) | |||
{ | |||
return AuthenticateResult.Fail("Invalid credentials."); | |||
} | |||
var claims = new[] | |||
{ | |||
new Claim(ClaimTypes.NameIdentifier, context.get("username") as string ?? string.Empty) | |||
}; | |||
var identity = new ClaimsIdentity(claims, Scheme.Name); | |||
var principal = new ClaimsPrincipal(identity); | |||
var ticket = new AuthenticationTicket(principal, Scheme.Name); | |||
return AuthenticateResult.Success(ticket); | |||
} | |||
catch (Exception exception) | |||
{ | |||
_logger.LogWarning("Error while authenticating user.", exception); | |||
return AuthenticateResult.Fail(exception); | |||
} | |||
} | |||
private bool ValidateUser(PythonDictionary context) | |||
{ | |||
var handlerPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Web", "authorization_handler.py"); | |||
if (!File.Exists(handlerPath)) | |||
{ | |||
return false; | |||
} | |||
var code = File.ReadAllText(handlerPath); | |||
var scriptEngine = IronPython.Hosting.Python.CreateEngine(); | |||
//scriptEngine.Runtime.IO.SetOutput(new PythonIOStream(_logger.), Encoding.UTF8); | |||
var scriptScope = scriptEngine.CreateScope(); | |||
var scriptSource = scriptScope.Engine.CreateScriptSourceFromString(code, SourceCodeKind.File); | |||
var compiledCode = scriptSource.Compile(); | |||
compiledCode.Execute(scriptScope); | |||
var function = scriptScope.Engine.Operations.GetMember<PythonFunction>(scriptScope, "handle_authenticate"); | |||
scriptScope.Engine.Operations.Invoke(function, context); | |||
return context.get("is_authenticated", false) as bool? == true; | |||
} | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using System; | |||
using System.IO; | |||
using System.Text; | |||
using Microsoft.AspNetCore.Http; | |||
namespace MQTTnet.Server.Web | |||
{ | |||
public static class Extensions | |||
{ | |||
public static string ReadBodyAsString(this HttpRequest request) | |||
{ | |||
if (request == null) throw new ArgumentNullException(nameof(request)); | |||
using (var reader = new StreamReader(request.Body, Encoding.UTF8)) | |||
{ | |||
return reader.ReadToEnd(); | |||
} | |||
} | |||
} | |||
} |
@@ -1,10 +1,13 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using Microsoft.AspNetCore.Authentication; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.AspNetCore.Hosting; | |||
using Microsoft.AspNetCore.Mvc; | |||
using Microsoft.EntityFrameworkCore.Internal; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Net.Http.Headers; | |||
using Microsoft.OpenApi.Models; | |||
using Microsoft.Scripting.Utils; | |||
using MQTTnet.Server.Configuration; | |||
@@ -14,7 +17,7 @@ using MQTTnet.Server.Scripting; | |||
using MQTTnet.Server.Scripting.DataSharing; | |||
using Swashbuckle.AspNetCore.SwaggerUI; | |||
namespace MQTTnet.Server | |||
namespace MQTTnet.Server.Web | |||
{ | |||
public class Startup | |||
{ | |||
@@ -35,7 +38,7 @@ namespace MQTTnet.Server | |||
MqttServerService mqttServerService, | |||
PythonScriptHostService pythonScriptHostService, | |||
DataSharingService dataSharingService, | |||
SettingsModel settings) | |||
MqttSettingsModel mqttSettings) | |||
{ | |||
if (environment.IsDevelopment()) | |||
{ | |||
@@ -46,12 +49,20 @@ namespace MQTTnet.Server | |||
application.UseHsts(); | |||
} | |||
application.UseCors(x => x | |||
.AllowAnyOrigin() | |||
.AllowAnyMethod() | |||
.AllowAnyHeader() | |||
.AllowCredentials()); | |||
application.UseAuthentication(); | |||
application.UseStaticFiles(); | |||
application.UseHttpsRedirection(); | |||
application.UseMvc(); | |||
ConfigureWebSocketEndpoint(application, mqttServerService, settings); | |||
ConfigureWebSocketEndpoint(application, mqttServerService, mqttSettings); | |||
dataSharingService.Configure(); | |||
pythonScriptHostService.Configure(); | |||
@@ -73,6 +84,8 @@ namespace MQTTnet.Server | |||
public void ConfigureServices(IServiceCollection services) | |||
{ | |||
services.AddCors(); | |||
services.AddMvc() | |||
.SetCompatibilityVersion(CompatibilityVersion.Version_2_2) | |||
.AddJsonOptions(options => | |||
@@ -80,7 +93,7 @@ namespace MQTTnet.Server | |||
options.SerializerSettings.Converters.Add(new Newtonsoft.Json.Converters.StringEnumConverter()); | |||
}); | |||
services.AddSingleton(ReadSettings()); | |||
ReadMqttSettings(services); | |||
services.AddSingleton<PythonIOStream>(); | |||
services.AddSingleton<PythonScriptHostService>(); | |||
@@ -98,10 +111,21 @@ namespace MQTTnet.Server | |||
services.AddSingleton<MqttConnectionValidator>(); | |||
services.AddSingleton<MqttSubscriptionInterceptor>(); | |||
services.AddSingleton<MqttApplicationMessageInterceptor>(); | |||
services.AddSwaggerGen(c => | |||
{ | |||
c.DescribeAllEnumsAsStrings(); | |||
c.AddSecurityDefinition("Basic", new OpenApiSecurityScheme | |||
{ | |||
Scheme = "Basic", | |||
Name = HeaderNames.Authorization, | |||
Type = SecuritySchemeType.Http, | |||
In = ParameterLocation.Header | |||
}); | |||
c.AddSecurityRequirement(new OpenApiSecurityRequirement | |||
{ | |||
[new OpenApiSecurityScheme { Name = "Basic" }] = new List<string>() | |||
}); | |||
c.SwaggerDoc("v1", new OpenApiInfo | |||
{ | |||
Title = "MQTTnet.Server API", | |||
@@ -120,46 +144,52 @@ namespace MQTTnet.Server | |||
}, | |||
}); | |||
}); | |||
services.AddAuthentication("Basic").AddScheme<AuthenticationSchemeOptions, AuthenticationHandler>("Basic", null); | |||
} | |||
private SettingsModel ReadSettings() | |||
private void ReadMqttSettings(IServiceCollection services) | |||
{ | |||
var settings = new Configuration.SettingsModel(); | |||
Configuration.Bind("MQTT", settings); | |||
return settings; | |||
var mqttSettings = new MqttSettingsModel(); | |||
Configuration.Bind("MQTT", mqttSettings); | |||
services.AddSingleton(mqttSettings); | |||
var scriptingSettings = new ScriptingSettingsModel(); | |||
Configuration.Bind("Scripting", scriptingSettings); | |||
services.AddSingleton(scriptingSettings); | |||
} | |||
private static void ConfigureWebSocketEndpoint( | |||
IApplicationBuilder application, | |||
MqttServerService mqttServerService, | |||
SettingsModel settings) | |||
MqttSettingsModel mqttSettings) | |||
{ | |||
if (settings?.WebSocketEndPoint?.Enabled != true) | |||
if (mqttSettings?.WebSocketEndPoint?.Enabled != true) | |||
{ | |||
return; | |||
} | |||
if (string.IsNullOrEmpty(settings.WebSocketEndPoint.Path)) | |||
if (string.IsNullOrEmpty(mqttSettings.WebSocketEndPoint.Path)) | |||
{ | |||
return; | |||
} | |||
var webSocketOptions = new WebSocketOptions | |||
{ | |||
KeepAliveInterval = TimeSpan.FromSeconds(settings.WebSocketEndPoint.KeepAliveInterval), | |||
ReceiveBufferSize = settings.WebSocketEndPoint.ReceiveBufferSize | |||
KeepAliveInterval = TimeSpan.FromSeconds(mqttSettings.WebSocketEndPoint.KeepAliveInterval), | |||
ReceiveBufferSize = mqttSettings.WebSocketEndPoint.ReceiveBufferSize | |||
}; | |||
if (settings.WebSocketEndPoint.AllowedOrigins?.Any() == true) | |||
if (mqttSettings.WebSocketEndPoint.AllowedOrigins?.Any() == true) | |||
{ | |||
webSocketOptions.AllowedOrigins.AddRange(settings.WebSocketEndPoint.AllowedOrigins); | |||
webSocketOptions.AllowedOrigins.AddRange(mqttSettings.WebSocketEndPoint.AllowedOrigins); | |||
} | |||
application.UseWebSockets(webSocketOptions); | |||
application.Use(async (context, next) => | |||
{ | |||
if (context.Request.Path == settings.WebSocketEndPoint.Path) | |||
if (context.Request.Path == mqttSettings.WebSocketEndPoint.Path) | |||
{ | |||
if (context.WebSockets.IsWebSocketRequest) | |||
{ |
@@ -0,0 +1,10 @@ | |||
def handle_authenticate(context): | |||
""" | |||
This function is invoked whenever a user tries to access protected HTTP resources. | |||
This function must exist and return a proper value. Otherwise the request is denied. | |||
""" | |||
username = context["username"] | |||
password = context["password"] | |||
context["is_authenticated"] = True # Change this to _False_ in case of invalid credentials. |
@@ -37,15 +37,22 @@ | |||
"AllowedOrigins": [] // List of strings with URLs. | |||
}, | |||
"CommunicationTimeout": 15, // In seconds. | |||
"ConnectionBacklog": 0, /* Set 0 to disable */ | |||
"ConnectionBacklog": 10, // Set 0 to disable | |||
"EnablePersistentSessions": false, | |||
"MaxPendingMessagesPerClient": 250, | |||
"RetainedApplicationMessages": { | |||
"Persist": true, | |||
"Filename": "RetainedApplicationMessages.json", | |||
"Persist": false, | |||
"Path": "RetainedApplicationMessages.json", | |||
"WriteInterval": 10 // In seconds. | |||
}, | |||
"EnableDebugLogging": true | |||
"EnableDebugLogging": false | |||
}, | |||
"Scripting": { | |||
"IncludePaths": [ | |||
"Lib", | |||
"/usr/lib/python2.7", | |||
"C:\\Python27\\Lib" | |||
] | |||
}, | |||
"Logging": { | |||
"LogLevel": { | |||
@@ -1,2 +0,0 @@ | |||
@echo off | |||
START "MQTTnet Server" dotnet .\MQTTnet.Server.dll |
@@ -1,3 +0,0 @@ | |||
#!/bin/bash | |||
echo "Starting MQTTnet Server.." | |||
dotnet .\MQTTnet.Server.dll |
@@ -12,7 +12,7 @@ | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" /> | |||
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||