Parcourir la source

Add new multi threaded application message handler for server.

release/3.x.x
Christian il y a 4 ans
Parent
révision
f92cfcd293
7 fichiers modifiés avec 105 ajouts et 25 suppressions
  1. +2
    -0
      Source/MQTTnet/Server/CheckSubscriptionsResult.cs
  2. +11
    -4
      Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs
  3. +17
    -9
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  4. +1
    -4
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  5. +2
    -2
      Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs
  6. +46
    -0
      Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs
  7. +26
    -6
      Source/MQTTnet/Server/MqttServerOptionsBuilder.cs

+ 2
- 0
Source/MQTTnet/Server/CheckSubscriptionsResult.cs Voir le fichier

@@ -4,6 +4,8 @@ namespace MQTTnet.Server
{
public struct CheckSubscriptionsResult
{
public static CheckSubscriptionsResult NotSubscribed = new CheckSubscriptionsResult();

public bool IsSubscribed { get; set; }

public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }


+ 11
- 4
Source/MQTTnet/Server/MqttApplicationMessageInterceptorContext.cs Voir le fichier

@@ -1,16 +1,23 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public class MqttApplicationMessageInterceptorContext
public sealed class MqttApplicationMessageInterceptorContext
{
public MqttApplicationMessageInterceptorContext(string clientId, IDictionary<object, object> sessionItems, MqttApplicationMessage applicationMessage)
public MqttApplicationMessageInterceptorContext(string clientId, IDictionary<object, object> sessionItems, IMqttNetScopedLogger logger)
{
ClientId = clientId;
ApplicationMessage = applicationMessage;
SessionItems = sessionItems;
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

/// <summary>
/// Gets the currently used logger.
/// </summary>
public IMqttNetScopedLogger Logger { get; }

public string ClientId { get; }

public MqttApplicationMessage ApplicationMessage { get; set; }


+ 17
- 9
Source/MQTTnet/Server/MqttClientSessionsManager.cs Voir le fichier

@@ -322,19 +322,21 @@ namespace MQTTnet.Server
}

var deliveryCount = 0;
List<MqttClientSession> sessions;
lock (_sessions)
{
foreach (var clientSession in _sessions.Values)
sessions = _sessions.Values.ToList();
}

foreach (var clientSession in sessions)
{
var isSubscribed = clientSession.EnqueueApplicationMessage(applicationMessage, senderClientId, false);
if (isSubscribed)
{
var isSubscribed = clientSession.EnqueueApplicationMessage(applicationMessage, senderClientId, false);
if (isSubscribed)
{
deliveryCount++;
}
deliveryCount++;
}
}

if (deliveryCount == 0)
{
var undeliveredMessageInterceptor = _options.UndeliveredMessageInterceptor;
@@ -445,7 +447,13 @@ namespace MQTTnet.Server
sessionItems = clientConnection.Session.Items;
}

var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, applicationMessage);
var interceptorContext = new MqttApplicationMessageInterceptorContext(senderClientId, sessionItems, _logger)
{
AcceptPublish = true,
ApplicationMessage = applicationMessage,
CloseConnection = false
};

await interceptor.InterceptApplicationMessagePublishAsync(interceptorContext).ConfigureAwait(false);
return interceptorContext;
}


+ 1
- 4
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs Voir le fichier

@@ -193,10 +193,7 @@ namespace MQTTnet.Server

if (qosLevels.Count == 0)
{
return new CheckSubscriptionsResult
{
IsSubscribed = false
};
return CheckSubscriptionsResult.NotSubscribed;
}

return CreateSubscriptionResult(qosLevel, qosLevels);


+ 2
- 2
Source/MQTTnet/Server/MqttServerApplicationMessageInterceptorDelegate.cs Voir le fichier

@@ -3,9 +3,9 @@ using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor
public sealed class MqttServerApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor
{
private readonly Func<MqttApplicationMessageInterceptorContext, Task> _callback;
readonly Func<MqttApplicationMessageInterceptorContext, Task> _callback;

public MqttServerApplicationMessageInterceptorDelegate(Action<MqttApplicationMessageInterceptorContext> callback)
{


+ 46
- 0
Source/MQTTnet/Server/MqttServerMultiThreadedApplicationMessageInterceptorDelegate.cs Voir le fichier

@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Diagnostics;
using MQTTnet.Implementations;
using MQTTnet.Internal;

namespace MQTTnet.Server
{
public sealed class MqttServerMultiThreadedApplicationMessageInterceptorDelegate : IMqttServerApplicationMessageInterceptor
{
readonly Func<MqttApplicationMessageInterceptorContext, Task> _callback;

public MqttServerMultiThreadedApplicationMessageInterceptorDelegate(Action<MqttApplicationMessageInterceptorContext> callback)
{
if (callback == null) throw new ArgumentNullException(nameof(callback));

_callback = context =>
{
callback(context);
return Task.FromResult(0);
};
}

public MqttServerMultiThreadedApplicationMessageInterceptorDelegate(Func<MqttApplicationMessageInterceptorContext, Task> callback)
{
_callback = callback ?? throw new ArgumentNullException(nameof(callback));
}

public Task InterceptApplicationMessagePublishAsync(MqttApplicationMessageInterceptorContext context)
{
Task.Run(async () =>
{
try
{
await _callback.Invoke(context).ConfigureAwait(false);
}
catch (Exception exception)
{
context.Logger.Error(exception, "Error while intercepting application message.");
}
}).RunInBackground();

return PlatformAbstractionLayer.CompletedTask;
}
}
}

+ 26
- 6
Source/MQTTnet/Server/MqttServerOptionsBuilder.cs Voir le fichier

@@ -3,11 +3,13 @@ using System.Net;
using System.Net.Security;
using System.Security.Authentication;
using MQTTnet.Certificates;
using System.Threading.Tasks;

#if !WINDOWS_UWP
using System.Security.Cryptography.X509Certificates;
#endif

// ReSharper disable UnusedMember.Global
namespace MQTTnet.Server
{
public class MqttServerOptionsBuilder
@@ -187,6 +189,24 @@ namespace MQTTnet.Server
return this;
}

public MqttServerOptionsBuilder WithApplicationMessageInterceptor(Func<MqttApplicationMessageInterceptorContext, Task> value)
{
_options.ApplicationMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
return this;
}

public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Action<MqttApplicationMessageInterceptorContext> value)
{
_options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value);
return this;
}

public MqttServerOptionsBuilder WithMultiThreadedApplicationMessageInterceptor(Func<MqttApplicationMessageInterceptorContext, Task> value)
{
_options.ApplicationMessageInterceptor = new MqttServerMultiThreadedApplicationMessageInterceptorDelegate(value);
return this;
}

public MqttServerOptionsBuilder WithClientMessageQueueInterceptor(IMqttServerClientMessageQueueInterceptor value)
{
_options.ClientMessageQueueInterceptor = value;
@@ -217,6 +237,12 @@ namespace MQTTnet.Server
return this;
}

public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action<MqttApplicationMessageInterceptorContext> value)
{
_options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
return this;
}

public MqttServerOptionsBuilder WithDefaultEndpointReuseAddress()
{
_options.DefaultEndpointOptions.ReuseAddress = true;
@@ -248,11 +274,5 @@ namespace MQTTnet.Server
{
return _options;
}

public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action<MqttApplicationMessageInterceptorContext> value)
{
_options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
return this;
}
}
}

Chargement…
Annuler
Enregistrer