ソースを参照

Expose more properties to connection validator context.

release/3.x.x
Christian Kratky 5年前
コミット
bd665c3d56
12個のファイルの変更302行の追加67行の削除
  1. +2
    -2
      MQTTnet.sln
  2. +1
    -1
      README.md
  3. +4
    -0
      Source/MQTTnet/Formatter/IMqttDataConverter.cs
  4. +12
    -0
      Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs
  5. +19
    -0
      Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs
  6. +91
    -0
      Source/MQTTnet/Protocol/MqttConnectReasonCodeConverter.cs
  7. +32
    -27
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  8. +69
    -24
      Source/MQTTnet/Server/MqttConnectionValidatorContext.cs
  9. +3
    -1
      Source/MQTTnet/Server/MqttServer.cs
  10. +23
    -7
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  11. +40
    -0
      Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs
  12. +6
    -5
      Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs

+ 2
- 2
MQTTnet.sln ファイルの表示

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28307.645
# Visual Studio Version 16
VisualStudioVersion = 16.0.29009.5
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject


+ 1
- 1
README.md ファイルの表示

@@ -26,7 +26,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Uniform API across all supported versions of the MQTT protocol
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (~200 tests)
* Unit tested (~210 tests)
* No external dependencies

\* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_.


+ 4
- 0
Source/MQTTnet/Formatter/IMqttDataConverter.cs ファイルの表示

@@ -5,6 +5,8 @@ using MQTTnet.Client.Publishing;
using MQTTnet.Client.Subscribing;
using MQTTnet.Client.Unsubscribing;
using MQTTnet.Packets;
using MQTTnet.Server;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

namespace MQTTnet.Formatter
{
@@ -20,6 +22,8 @@ namespace MQTTnet.Formatter

MqttConnectPacket CreateConnectPacket(MqttApplicationMessage willApplicationMessage, IMqttClientOptions options);

MqttConnAckPacket CreateConnAckPacket(MqttConnectionValidatorContext connectionValidatorContext);

MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket);

MqttClientUnsubscribeResult CreateClientUnsubscribeResult(MqttUnsubscribePacket unsubscribePacket, MqttUnsubAckPacket unsubAckPacket);


+ 12
- 0
Source/MQTTnet/Formatter/V3/MqttV310DataConverter.cs ファイルの表示

@@ -9,6 +9,8 @@ using MQTTnet.Client.Unsubscribing;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

namespace MQTTnet.Formatter.V3
{
@@ -124,6 +126,16 @@ namespace MQTTnet.Formatter.V3
};
}

public MqttConnAckPacket CreateConnAckPacket(MqttConnectionValidatorContext connectionValidatorContext)
{
if (connectionValidatorContext == null) throw new ArgumentNullException(nameof(connectionValidatorContext));

return new MqttConnAckPacket
{
ReturnCode = new MqttConnectReasonCodeConverter().ToConnectReturnCode(connectionValidatorContext.ReasonCode)
};
}

public MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));


+ 19
- 0
Source/MQTTnet/Formatter/V5/MqttV500DataConverter.cs ファイルの表示

@@ -10,6 +10,9 @@ using MQTTnet.Client.Unsubscribing;
using MQTTnet.Exceptions;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;

using MqttClientSubscribeResult = MQTTnet.Client.Subscribing.MqttClientSubscribeResult;

namespace MQTTnet.Formatter.V5
{
@@ -129,6 +132,22 @@ namespace MQTTnet.Formatter.V5
};
}

public MqttConnAckPacket CreateConnAckPacket(MqttConnectionValidatorContext connectionValidatorContext)
{
return new MqttConnAckPacket
{
ReasonCode = connectionValidatorContext.ReasonCode,
Properties = new MqttConnAckPacketProperties
{
UserProperties = connectionValidatorContext.UserProperties,
AuthenticationMethod = connectionValidatorContext.AuthenticationMethod,
AuthenticationData = connectionValidatorContext.ResponseAuthenticationData,
AssignedClientIdentifier = connectionValidatorContext.AssignedClientIdentifier,
ReasonString = connectionValidatorContext.ReasonString
}
};
}

public MqttClientSubscribeResult CreateClientSubscribeResult(MqttSubscribePacket subscribePacket, MqttSubAckPacket subAckPacket)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));


+ 91
- 0
Source/MQTTnet/Protocol/MqttConnectReasonCodeConverter.cs ファイルの表示

@@ -0,0 +1,91 @@
using MQTTnet.Exceptions;

namespace MQTTnet.Protocol
{
public class MqttConnectReasonCodeConverter
{
public MqttConnectReturnCode ToConnectReturnCode(MqttConnectReasonCode reasonCode)
{
switch (reasonCode)
{
case MqttConnectReasonCode.Success:
{
return MqttConnectReturnCode.ConnectionAccepted;
}

case MqttConnectReasonCode.NotAuthorized:
{
return MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
}

case MqttConnectReasonCode.BadUserNameOrPassword:
{
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
}

case MqttConnectReasonCode.ClientIdentifierNotValid:
{
return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
}

case MqttConnectReasonCode.UnsupportedProtocolVersion:
{
return MqttConnectReturnCode.ConnectionRefusedUnacceptableProtocolVersion;
}

case MqttConnectReasonCode.ServerUnavailable:
case MqttConnectReasonCode.ServerBusy:
case MqttConnectReasonCode.ServerMoved:
{
return MqttConnectReturnCode.ConnectionRefusedServerUnavailable;
}

default:
{
throw new MqttProtocolViolationException("Unable to convert connect reason code (MQTTv5) to return code (MQTTv3).");
}
}
}

public MqttConnectReasonCode ToConnectReasonCode(MqttConnectReturnCode returnCode)
{
switch (returnCode)
{
case MqttConnectReturnCode.ConnectionAccepted:
{
return MqttConnectReasonCode.Success;
}

case MqttConnectReturnCode.ConnectionRefusedUnacceptableProtocolVersion:
{
return MqttConnectReasonCode.UnsupportedProtocolVersion;
}

case MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword:
{
return MqttConnectReasonCode.BadUserNameOrPassword;
}

case MqttConnectReturnCode.ConnectionRefusedIdentifierRejected:
{
return MqttConnectReasonCode.ClientIdentifierNotValid;
}

case MqttConnectReturnCode.ConnectionRefusedServerUnavailable:
{
return MqttConnectReasonCode.ServerUnavailable;
}

case MqttConnectReturnCode.ConnectionRefusedNotAuthorized:
{
return MqttConnectReasonCode.NotAuthorized;
}

default:
{
throw new MqttProtocolViolationException("Unable to convert connect reason code (MQTTv5) to return code (MQTTv3).");
}
}
}
}
}

+ 32
- 27
Source/MQTTnet/Server/MqttClientSessionsManager.cs ファイルの表示

@@ -5,6 +5,7 @@ using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
@@ -227,7 +228,7 @@ namespace MQTTnet.Server
private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
var disconnectType = MqttClientDisconnectType.NotClean;
var clientId = string.Empty;
string clientId = null;

try
{
@@ -242,20 +243,12 @@ namespace MQTTnet.Server

var validatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

if (validatorContext.ReturnCode != MqttConnectReturnCode.ConnectionAccepted)
if (validatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
// TODO: Move to channel adapter data converter.

// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
await channelAdapter.SendPacketAsync(
new MqttConnAckPacket
{
ReturnCode = validatorContext.ReturnCode,
ReasonCode = MqttConnectReasonCode.NotAuthorized
},
_options.DefaultCommunicationTimeout,
cancellationToken).ConfigureAwait(false);
var connAckPacket = channelAdapter.PacketFormatterAdapter.DataConverter.CreateConnAckPacket(validatorContext);
await channelAdapter.SendPacketAsync(connAckPacket, _options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false);

return;
}
@@ -275,39 +268,51 @@ namespace MQTTnet.Server
}
finally
{
_connections.TryRemove(clientId, out _);

if (!_options.EnablePersistentSessions)
if (clientId != null)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
_connections.TryRemove(clientId, out _);

if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
}

await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);

await _eventDispatcher.HandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
if (clientId != null)
{
await _eventDispatcher.TryHandleClientDisconnectedAsync(clientId, disconnectType).ConfigureAwait(false);
}
}
}

private async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter)
private async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
var context = new MqttConnectionValidatorContext(
connectPacket.ClientId,
connectPacket.Username,
connectPacket.Password,
connectPacket.WillMessage,
clientAdapter.Endpoint,
clientAdapter.IsSecureConnection,
clientAdapter.ClientCertificate);
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter);

var connectionValidator = _options.ConnectionValidator;

if (connectionValidator == null)
{
context.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
context.ReasonCode = MqttConnectReasonCode.Success;
return context;
}

await connectionValidator.ValidateConnectionAsync(context).ConfigureAwait(false);

// Check the client ID and set a random one if supported.
if (string.IsNullOrEmpty(connectPacket.ClientId) &&
channelAdapter.PacketFormatterAdapter.ProtocolVersion == MqttProtocolVersion.V500)
{
connectPacket.ClientId = context.AssignedClientIdentifier;
}

if (string.IsNullOrEmpty(connectPacket.ClientId))
{
context.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
}

return context;
}



+ 69
- 24
Source/MQTTnet/Server/MqttConnectionValidatorContext.cs ファイルの表示

@@ -1,45 +1,90 @@
using System.Security.Cryptography.X509Certificates;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet.Adapter;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Protocol;

namespace MQTTnet.Server
{
public class MqttConnectionValidatorContext
{
public MqttConnectionValidatorContext(
string clientId,
string username,
byte[] password,
MqttApplicationMessage willMessage,
string endpoint,
bool isSecureConnection,
X509Certificate2 clientCertificate)
private readonly MqttConnectPacket _connectPacket;
private readonly IMqttChannelAdapter _clientAdapter;

public MqttConnectionValidatorContext(MqttConnectPacket connectPacket, IMqttChannelAdapter clientAdapter)
{
ClientId = clientId;
Username = username;
RawPassword = password;
WillMessage = willMessage;
Endpoint = endpoint;
IsSecureConnection = isSecureConnection;
ClientCertificate = clientCertificate;
_connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket));
_clientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
}

public string ClientId { get; }
public string ClientId => _connectPacket.ClientId;

public string Username => _connectPacket.Username;

public string Username { get; }
public byte[] RawPassword => _connectPacket.Password;

public string Password => Encoding.UTF8.GetString(RawPassword ?? new byte[0]);

public byte[] RawPassword { get; }
public MqttApplicationMessage WillMessage => _connectPacket.WillMessage;

public bool CleanSession => _connectPacket.CleanSession;

public ushort KeepAlivePeriod => _connectPacket.KeepAlivePeriod;

public List<MqttUserProperty> UserProperties => _connectPacket.Properties?.UserProperties;

public byte[] AuthenticationData => _connectPacket.Properties?.AuthenticationData;

public string AuthenticationMethod => _connectPacket.Properties?.AuthenticationMethod;

public uint? MaximumPacketSize => _connectPacket.Properties?.MaximumPacketSize;

public ushort? ReceiveMaximum => _connectPacket.Properties?.ReceiveMaximum;

public ushort? TopicAliasMaximum => _connectPacket.Properties?.TopicAliasMaximum;

public bool? RequestProblemInformation => _connectPacket.Properties?.RequestProblemInformation;

public bool? RequestResponseInformation => _connectPacket.Properties?.RequestResponseInformation;

public uint? SessionExpiryInterval => _connectPacket.Properties?.SessionExpiryInterval;

public uint? WillDelayInterval => _connectPacket.Properties?.WillDelayInterval;

public string Endpoint => _clientAdapter.Endpoint;

public bool IsSecureConnection => _clientAdapter.IsSecureConnection;

public X509Certificate2 ClientCertificate => _clientAdapter.ClientCertificate;

public MqttProtocolVersion ProtocolVersion => _clientAdapter.PacketFormatterAdapter.ProtocolVersion;

/// <summary>
/// This is used for MQTTv3 only.
/// </summary>
[Obsolete("Use ReasonCode instead. It is MQTTv5 only but will be converted to a valid ReturnCode.")]
public MqttConnectReturnCode ReturnCode
{
get => new MqttConnectReasonCodeConverter().ToConnectReturnCode(ReasonCode);
set => ReasonCode = new MqttConnectReasonCodeConverter().ToConnectReasonCode(value);
}

public MqttApplicationMessage WillMessage { get; }
/// <summary>
/// This is used for MQTTv5 only. When a MQTTv3 client connects the enum value must be one which is
/// also supported in MQTTv3. Otherwise the connection attempt will fail because not all codes can be
/// converted properly.
/// </summary>
public MqttConnectReasonCode ReasonCode { get; set; } = MqttConnectReasonCode.Success;

public string Endpoint { get; }
public List<MqttUserProperty> ResponseUserProperties { get; set; }

public bool IsSecureConnection { get; }
public byte[] ResponseAuthenticationData { get; set; }

public X509Certificate2 ClientCertificate { get; }
public string AssignedClientIdentifier { get; set; }

public MqttConnectReturnCode ReturnCode { get; set; } = MqttConnectReturnCode.ConnectionAccepted;
public string ReasonString { get; set; }
}
}

+ 3
- 1
Source/MQTTnet/Server/MqttServer.cs ファイルの表示

@@ -14,7 +14,7 @@ namespace MQTTnet.Server
{
public class MqttServer : IMqttServer
{
private readonly MqttServerEventDispatcher _eventDispatcher = new MqttServerEventDispatcher();
private readonly MqttServerEventDispatcher _eventDispatcher;
private readonly ICollection<IMqttServerAdapter> _adapters;
private readonly IMqttNetChildLogger _logger;

@@ -29,6 +29,8 @@ namespace MQTTnet.Server

if (logger == null) throw new ArgumentNullException(nameof(logger));
_logger = logger.CreateChildLogger(nameof(MqttServer));

_eventDispatcher = new MqttServerEventDispatcher(logger.CreateChildLogger(nameof(MqttServerEventDispatcher)));
}

public IMqttServerStartedHandler StartedHandler { get; set; }


+ 23
- 7
Source/MQTTnet/Server/MqttServerEventDispatcher.cs ファイルの表示

@@ -1,10 +1,19 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;

namespace MQTTnet.Server
{
public class MqttServerEventDispatcher
{
private readonly IMqttNetChildLogger _logger;

public MqttServerEventDispatcher(IMqttNetChildLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }

public IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }
@@ -26,15 +35,22 @@ namespace MQTTnet.Server
return handler.HandleClientConnectedAsync(new MqttServerClientConnectedEventArgs(clientId));
}

public Task HandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType)
public async Task TryHandleClientDisconnectedAsync(string clientId, MqttClientDisconnectType disconnectType)
{
var handler = ClientDisconnectedHandler;
if (handler == null)
try
{
return Task.FromResult(0);
}
var handler = ClientDisconnectedHandler;
if (handler == null)
{
return;
}

return handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType));
await handler.HandleClientDisconnectedAsync(new MqttServerClientDisconnectedEventArgs(clientId, disconnectType)).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.Error(exception, "Error while handling 'ClientDisconnected' event.");
}
}

public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter)


+ 40
- 0
Tests/MQTTnet.Core.Tests/Mockups/TestLogger.cs ファイルの表示

@@ -0,0 +1,40 @@
using System;
using MQTTnet.Diagnostics;

namespace MQTTnet.Tests.Mockups
{
public class TestLogger : IMqttNetLogger, IMqttNetChildLogger
{
public event EventHandler<MqttNetLogMessagePublishedEventArgs> LogMessagePublished;

IMqttNetChildLogger IMqttNetLogger.CreateChildLogger(string source)
{
return new MqttNetChildLogger(this, source);
}

IMqttNetChildLogger IMqttNetChildLogger.CreateChildLogger(string source)
{
return new MqttNetChildLogger(this, source);
}

public void Verbose(string message, params object[] parameters)
{
}

public void Info(string message, params object[] parameters)
{
}

public void Warning(Exception exception, string message, params object[] parameters)
{
}

public void Error(Exception exception, string message, params object[] parameters)
{
}

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
{
}
}
}

+ 6
- 5
Tests/MQTTnet.Core.Tests/MqttSubscriptionsManager_Tests.cs ファイルの表示

@@ -2,6 +2,7 @@
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MQTTnet.Tests.Mockups;

namespace MQTTnet.Tests
{
@@ -11,7 +12,7 @@ namespace MQTTnet.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(), new MqttServerOptions());
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -26,7 +27,7 @@ namespace MQTTnet.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeDifferentQoSSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(), new MqttServerOptions());
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter { Topic = "A/B/C", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
@@ -41,7 +42,7 @@ namespace MQTTnet.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeTwoTimesSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(), new MqttServerOptions());
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilter { Topic = "#", QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce });
@@ -57,7 +58,7 @@ namespace MQTTnet.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(), new MqttServerOptions());
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());
@@ -70,7 +71,7 @@ namespace MQTTnet.Tests
[TestMethod]
public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
{
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(), new MqttServerOptions());
var sm = new MqttClientSubscriptionsManager("", new MqttServerEventDispatcher(new TestLogger()), new MqttServerOptions());

var sp = new MqttSubscribePacket();
sp.TopicFilters.Add(new TopicFilterBuilder().WithTopic("A/B/C").Build());


読み込み中…
キャンセル
保存