Procházet zdrojové kódy

[Server] Add new interceptor before messages are added to a queue.

release/3.x.x
Christian Kratky před 6 roky
rodič
revize
c88b46783b
7 změnil soubory, kde provedl 46 přidání a 8 odebrání
  1. +1
    -0
      Build/MQTTnet.nuspec
  2. +1
    -0
      Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs
  3. +20
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttClientMessageQueueInterceptorContext.cs
  4. +1
    -2
      Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs
  5. +20
    -5
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs
  6. +1
    -1
      Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs
  7. +2
    -0
      Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs

+ 1
- 0
Build/MQTTnet.nuspec Zobrazit soubor

@@ -24,6 +24,7 @@
* [Server] Keep alive checking is now suspended while large packages are being received (and thus the client is connected). Keep alive checking continues after a large packet is received completely.
* [Server] Rewritten the _ConnectedClients_ API and added new features for disconnecting and Endpoint information (IP etc.).
* [Server] Added settings for disabling persistent sessions and defining a max pending messages queue size per session.
* [Server] Added a new interceptor which is invoked before a new message is added to the client queue.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 1
- 0
Frameworks/MQTTnet.NetStandard/Server/IMqttServerOptions.cs Zobrazit soubor

@@ -16,6 +16,7 @@ namespace MQTTnet.Server
Action<MqttConnectionValidatorContext> ConnectionValidator { get; }
Action<MqttSubscriptionInterceptorContext> SubscriptionInterceptor { get; }
Action<MqttApplicationMessageInterceptorContext> ApplicationMessageInterceptor { get; }
Action<MqttClientMessageQueueInterceptorContext> ClientMessageQueueInterceptor { get; set; }

MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; }
MqttServerTlsEndpointOptions TlsEndpointOptions { get; }


+ 20
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttClientMessageQueueInterceptorContext.cs Zobrazit soubor

@@ -0,0 +1,20 @@
namespace MQTTnet.Server
{
public class MqttClientMessageQueueInterceptorContext
{
public MqttClientMessageQueueInterceptorContext(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage)
{
SenderClientId = senderClientId;
ReceiverClientId = receiverClientId;
ApplicationMessage = applicationMessage;
}

public string SenderClientId { get; }

public string ReceiverClientId { get; }

public MqttApplicationMessage ApplicationMessage { get; set; }

public bool AcceptEnqueue { get; set; } = true;
}
}

+ 1
- 2
Frameworks/MQTTnet.NetStandard/Server/MqttClientPendingMessagesQueue.cs Zobrazit soubor

@@ -19,7 +19,6 @@ namespace MQTTnet.Server
private readonly IMqttNetChildLogger _logger;

private ConcurrentQueue<MqttBasePacket> _queue = new ConcurrentQueue<MqttBasePacket>();
private Task _workerTask;

public MqttClientPendingMessagesQueue(IMqttServerOptions options, MqttClientSession clientSession, IMqttNetChildLogger logger)
{
@@ -41,7 +40,7 @@ namespace MQTTnet.Server
return;
}

_workerTask = Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken);
Task.Run(() => SendQueuedPacketsAsync(adapter, cancellationToken), cancellationToken);
}
public void Enqueue(MqttBasePacket packet)


+ 20
- 5
Frameworks/MQTTnet.NetStandard/Server/MqttClientSession.cs Zobrazit soubor

@@ -153,24 +153,39 @@ namespace MQTTnet.Server
}
}

public void EnqueueApplicationMessage(MqttApplicationMessage applicationMessage)
public void EnqueueApplicationMessage(MqttClientSession senderClientSession, MqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

var result = _subscriptionsManager.CheckSubscriptions(applicationMessage);
if (!result.IsSubscribed)
var checkSubscriptionsResult = _subscriptionsManager.CheckSubscriptions(applicationMessage);
if (!checkSubscriptionsResult.IsSubscribed)
{
return;
}

var publishPacket = applicationMessage.ToPublishPacket();
publishPacket.QualityOfServiceLevel = result.QualityOfServiceLevel;
publishPacket.QualityOfServiceLevel = checkSubscriptionsResult.QualityOfServiceLevel;

if (publishPacket.QualityOfServiceLevel > 0)
{
publishPacket.PacketIdentifier = _packetIdentifierProvider.GetNewPacketIdentifier();
}

if (_options.ClientMessageQueueInterceptor != null)
{
var context = new MqttClientMessageQueueInterceptorContext(
senderClientSession?.ClientId,
ClientId,
publishPacket.ToApplicationMessage());
_options.ClientMessageQueueInterceptor?.Invoke(context);

if (!context.AcceptEnqueue || context.ApplicationMessage == null)
{
return;
}
}
_pendingMessagesQueue.Enqueue(publishPacket);
}

@@ -276,7 +291,7 @@ namespace MQTTnet.Server
var retainedMessages = _retainedMessagesManager.GetSubscribedMessages(topicFilters);
foreach (var applicationMessage in retainedMessages)
{
EnqueueApplicationMessage(applicationMessage);
EnqueueApplicationMessage(null, applicationMessage);
}
}



+ 1
- 1
Frameworks/MQTTnet.NetStandard/Server/MqttClientSessionsManager.cs Zobrazit soubor

@@ -257,7 +257,7 @@ namespace MQTTnet.Server

foreach (var clientSession in _sessions.Values)
{
clientSession.EnqueueApplicationMessage(applicationMessage);
clientSession.EnqueueApplicationMessage(senderClientSession, applicationMessage);
}
}
catch (Exception exception)


+ 2
- 0
Frameworks/MQTTnet.NetStandard/Server/MqttServerOptions.cs Zobrazit soubor

@@ -21,6 +21,8 @@ namespace MQTTnet.Server

public Action<MqttApplicationMessageInterceptorContext> ApplicationMessageInterceptor { get; set; }

public Action<MqttClientMessageQueueInterceptorContext> ClientMessageQueueInterceptor { get; set; }

public Action<MqttSubscriptionInterceptorContext> SubscriptionInterceptor { get; set; }

public IMqttServerStorage Storage { get; set; }


Načítá se…
Zrušit
Uložit