Browse Source

Add first classes for persisted session support.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
ff22c34bb2
5 changed files with 41 additions and 6 deletions
  1. +3
    -1
      Source/MQTTnet/Server/IMqttServerOptions.cs
  2. +22
    -0
      Source/MQTTnet/Server/IMqttServerPersistedSession.cs
  3. +11
    -0
      Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs
  4. +1
    -1
      Source/MQTTnet/Server/MqttClientConnection.cs
  5. +4
    -4
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs

+ 3
- 1
Source/MQTTnet/Server/IMqttServerOptions.cs View File

@@ -21,6 +21,8 @@ namespace MQTTnet.Server
MqttServerTcpEndpointOptions DefaultEndpointOptions { get; } MqttServerTcpEndpointOptions DefaultEndpointOptions { get; }
MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; } MqttServerTlsTcpEndpointOptions TlsEndpointOptions { get; }


IMqttServerStorage Storage { get; }
IMqttServerStorage Storage { get; }

} }
} }

+ 22
- 0
Source/MQTTnet/Server/IMqttServerPersistedSession.cs View File

@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;

namespace MQTTnet.Server
{
public interface IMqttServerPersistedSession
{
string ClientId { get; }

IDictionary<object, object> Items { get; }

IList<TopicFilter> Subscriptions { get; }

MqttApplicationMessage WillMessage { get; }

uint? WillDelayInterval { get; }

DateTime? SessionExpiryTimestamp { get; }

IList<MqttQueuedApplicationMessage> PendingApplicationMessages { get; }
}
}

+ 11
- 0
Source/MQTTnet/Server/IMqttServerPersistedSessionsStorage.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerPersistedSessionsStorage
{
Task<IList<IMqttServerPersistedSession>> LoadPersistedSessionsAsync();
}
}

+ 1
- 1
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -229,7 +229,7 @@ namespace MQTTnet.Server
} }
else else
{ {
_logger.Error(exception, "Client '{0}': Unhandled exception while receiving client packets.", ClientId);
_logger.Error(exception, "Client '{0}': Error while receiving client packets.", ClientId);
} }


StopInternal(); StopInternal();


+ 4
- 4
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -9,7 +9,7 @@ namespace MQTTnet.Server
{ {
public class MqttClientSubscriptionsManager public class MqttClientSubscriptionsManager
{ {
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly Dictionary<string, TopicFilter> _subscriptions = new Dictionary<string, TopicFilter>();
private readonly MqttClientSession _clientSession; private readonly MqttClientSession _clientSession;
private readonly IMqttServerOptions _serverOptions; private readonly IMqttServerOptions _serverOptions;
private readonly MqttServerEventDispatcher _eventDispatcher; private readonly MqttServerEventDispatcher _eventDispatcher;
@@ -64,7 +64,7 @@ namespace MQTTnet.Server
{ {
lock (_subscriptions) lock (_subscriptions)
{ {
_subscriptions[finalTopicFilter.Topic] = finalTopicFilter.QualityOfServiceLevel;
_subscriptions[finalTopicFilter.Topic] = finalTopicFilter;
} }


await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false); await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, finalTopicFilter).ConfigureAwait(false);
@@ -90,7 +90,7 @@ namespace MQTTnet.Server
{ {
lock (_subscriptions) lock (_subscriptions)
{ {
_subscriptions[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
_subscriptions[topicFilter.Topic] = topicFilter;
} }


await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false); await _eventDispatcher.HandleClientSubscribedTopicAsync(_clientSession.ClientId, topicFilter).ConfigureAwait(false);
@@ -158,7 +158,7 @@ namespace MQTTnet.Server
continue; continue;
} }


qosLevels.Add(subscription.Value);
qosLevels.Add(subscription.Value.QualityOfServiceLevel);
} }
} }




Loading…
Cancel
Save