Explorar el Código

Added interceptor for undelivered messages.

release/3.x.x
SeppPenner hace 4 años
padre
commit
75dfe68a94
Se han modificado 8 ficheros con 56 adiciones y 1 borrados
  1. +2
    -1
      Build/MQTTnet.nuspec
  2. +2
    -0
      Source/MQTTnet/MqttApplicationMessage.cs
  3. +2
    -0
      Source/MQTTnet/Server/IMqttServerOptions.cs
  4. +2
    -0
      Source/MQTTnet/Server/MqttClientSession.cs
  5. +12
    -0
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  6. +2
    -0
      Source/MQTTnet/Server/MqttServerOptions.cs
  7. +6
    -0
      Source/MQTTnet/Server/MqttServerOptionsBuilder.cs
  8. +28
    -0
      Tests/MQTTnet.Core.Tests/Server_Tests.cs

+ 2
- 1
Build/MQTTnet.nuspec Ver fichero

@@ -16,7 +16,8 @@
* [AspNetCore] Adjusted some namespaces (BREAKING CHANGE!)
* [Server] Adjusted some namespaces (BREAKING CHANGE!)
* [Server] Added state checks (throw if not started etc.) for most server APIs.
* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (thanks to @borigas).
* [Server] Exposed real X509Certificate2 (instead byte array) to TLS options (Thanks to @borigas).
* [Core] Added server interceptor for undelivered messages (Thanks to @cshark-inator).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2020</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin Blazor</tags>


+ 2
- 0
Source/MQTTnet/MqttApplicationMessage.cs Ver fichero

@@ -29,5 +29,7 @@ namespace MQTTnet
public byte[] CorrelationData { get; set; }

public List<uint> SubscriptionIdentifiers { get; set; }

public uint DeliveryCount { get; set; }
}
}

+ 2
- 0
Source/MQTTnet/Server/IMqttServerOptions.cs Ver fichero

@@ -25,5 +25,7 @@ namespace MQTTnet.Server
IMqttServerStorage Storage { get; }

IMqttRetainedMessagesManager RetainedMessagesManager { get; }

IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; }
}
}

+ 2
- 0
Source/MQTTnet/Server/MqttClientSession.cs Ver fichero

@@ -56,6 +56,8 @@ namespace MQTTnet.Server

_logger.Verbose("Queued application message with topic '{0}' (ClientId: {1}).", applicationMessage.Topic, ClientId);

applicationMessage.DeliveryCount++;

ApplicationMessagesQueue.Enqueue(applicationMessage, senderClientId, checkSubscriptionsResult.QualityOfServiceLevel, isRetainedApplicationMessage);
}



+ 12
- 0
Source/MQTTnet/Server/MqttClientSessionsManager.cs Ver fichero

@@ -224,6 +224,8 @@ namespace MQTTnet.Server
await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);
}

applicationMessage.DeliveryCount = 0;

foreach (var clientSession in _sessions.Values)
{
clientSession.EnqueueApplicationMessage(
@@ -231,6 +233,16 @@ namespace MQTTnet.Server
sender?.ClientId,
false);
}

if (applicationMessage.DeliveryCount == 0)
{
if (_options.UndeliveredMessageInterceptor == null)
{
throw new OperationCanceledException(nameof(_options.UndeliveredMessageInterceptor));
}

await _options.UndeliveredMessageInterceptor.InterceptApplicationMessagePublishAsync(new MqttApplicationMessageInterceptorContext(sender?.ClientId, sender?.Session?.Items, applicationMessage));
}
}
catch (OperationCanceledException)
{


+ 2
- 0
Source/MQTTnet/Server/MqttServerOptions.cs Ver fichero

@@ -31,5 +31,7 @@ namespace MQTTnet.Server
public IMqttServerStorage Storage { get; set; }

public IMqttRetainedMessagesManager RetainedMessagesManager { get; set; } = new MqttRetainedMessagesManager();

public IMqttServerApplicationMessageInterceptor UndeliveredMessageInterceptor { get; set; }
}
}

+ 6
- 0
Source/MQTTnet/Server/MqttServerOptionsBuilder.cs Ver fichero

@@ -212,5 +212,11 @@ namespace MQTTnet.Server
{
return _options;
}

public MqttServerOptionsBuilder WithUndeliveredMessageInterceptor(Action<MqttApplicationMessageInterceptorContext> value)
{
_options.UndeliveredMessageInterceptor = new MqttServerApplicationMessageInterceptorDelegate(value);
return this;
}
}
}

+ 28
- 0
Tests/MQTTnet.Core.Tests/Server_Tests.cs Ver fichero

@@ -1384,5 +1384,33 @@ namespace MQTTnet.Tests
Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
}
}

[TestMethod]
public async Task Intercept_Undelivered()
{
using (var testEnvironment = new TestEnvironment())
{
var undeliverd = string.Empty;
var svr = await testEnvironment.StartServerAsync(new MqttServerOptionsBuilder().WithUndeliveredMessageInterceptor(
context =>
{
undeliverd = context.ApplicationMessage.Topic;
}));

var topicAReceived = false;
var topicBReceived = false;

var client = await testEnvironment.ConnectClientAsync();

await client.SubscribeAsync("b");

await client.PublishAsync("a", null, MqttQualityOfServiceLevel.ExactlyOnce);

await Task.Delay(500);

Assert.AreEqual(undeliverd, "a");

}
}
}
}

Cargando…
Cancelar
Guardar