Browse Source

Merge pull request #51 from JanEggers/Fix50_Eventing

Fix50 eventing
release/3.x.x
Christian 7 years ago
committed by GitHub
parent
commit
d23ab88add
15 changed files with 158 additions and 231 deletions
  1. +3
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +3
    -3
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  3. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  4. +1
    -1
      MQTTnet.Core/Adapter/IMqttServerAdapter.cs
  5. +5
    -7
      MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs
  6. +6
    -8
      MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs
  7. +1
    -2
      MQTTnet.Core/Server/IMqttServer.cs
  8. +12
    -14
      MQTTnet.Core/Server/MqttClientSession.cs
  9. +29
    -9
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  10. +11
    -15
      MQTTnet.Core/Server/MqttServer.cs
  11. +8
    -8
      MQTTnet.sln
  12. +13
    -108
      Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj
  13. +25
    -33
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  14. +0
    -15
      Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs
  15. +39
    -3
      Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs

+ 3
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs View File

@@ -22,7 +22,7 @@ namespace MQTTnet.Implementations


private bool _isRunning; private bool _isRunning;


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;


public Task StartAsync(MqttServerOptions options) public Task StartAsync(MqttServerOptions options)
{ {
@@ -93,7 +93,7 @@ namespace MQTTnet.Implementations


var tcpChannel = new MqttTcpChannel(clientSocket, null); var tcpChannel = new MqttTcpChannel(clientSocket, null);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
} }
catch (Exception exception) when (!(exception is ObjectDisposedException)) catch (Exception exception) when (!(exception is ObjectDisposedException))
{ {
@@ -118,7 +118,7 @@ namespace MQTTnet.Implementations


var tcpChannel = new MqttTcpChannel(clientSocket, sslStream); var tcpChannel = new MqttTcpChannel(clientSocket, sslStream);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 3
- 3
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs View File

@@ -22,7 +22,7 @@ namespace MQTTnet.Implementations


private bool _isRunning; private bool _isRunning;


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;


public Task StartAsync(MqttServerOptions options) public Task StartAsync(MqttServerOptions options)
{ {
@@ -90,7 +90,7 @@ namespace MQTTnet.Implementations
{ {
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false); var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -114,7 +114,7 @@ namespace MQTTnet.Implementations
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);


var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs View File

@@ -14,7 +14,7 @@ namespace MQTTnet.Implementations


private bool _isRunning; private bool _isRunning;


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;


public async Task StartAsync(MqttServerOptions options) public async Task StartAsync(MqttServerOptions options)
{ {
@@ -57,7 +57,7 @@ namespace MQTTnet.Implementations
try try
{ {
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer()); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
} }
catch (Exception exception) catch (Exception exception)
{ {


+ 1
- 1
MQTTnet.Core/Adapter/IMqttServerAdapter.cs View File

@@ -6,7 +6,7 @@ namespace MQTTnet.Core.Adapter
{ {
public interface IMqttServerAdapter public interface IMqttServerAdapter
{ {
event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event Action<IMqttCommunicationAdapter> ClientAccepted;


Task StartAsync(MqttServerOptions options); Task StartAsync(MqttServerOptions options);
Task StopAsync(); Task StopAsync();


+ 5
- 7
MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs View File

@@ -1,17 +1,15 @@
using System;
using MQTTnet.Core.Server;
using System;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
public class MqttClientConnectedEventArgs : EventArgs public class MqttClientConnectedEventArgs : EventArgs
{ {
public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter)
public MqttClientConnectedEventArgs(ConnectedMqttClient client)
{ {
Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier));
ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
Client = client ?? throw new ArgumentNullException(nameof(client));
} }


public string Identifier { get; }

public IMqttCommunicationAdapter ClientAdapter { get; }
public ConnectedMqttClient Client { get; }
} }
} }

+ 6
- 8
MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs View File

@@ -1,17 +1,15 @@
using System;
using MQTTnet.Core.Server;
using System;


namespace MQTTnet.Core.Adapter namespace MQTTnet.Core.Adapter
{ {
public class MqttClientDisconnectedEventArgs : EventArgs public class MqttClientDisconnectedEventArgs : EventArgs
{ {
public MqttClientDisconnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter)
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client)
{ {
Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier));
ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
Client = client ?? throw new ArgumentNullException(nameof(client));
} }

public string Identifier { get; }

public IMqttCommunicationAdapter ClientAdapter { get; }
public ConnectedMqttClient Client { get; }
} }
} }

+ 1
- 2
MQTTnet.Core/Server/IMqttServer.cs View File

@@ -11,8 +11,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttClientConnectedEventArgs> ClientConnected; event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;


IList<ConnectedMqttClient> GetConnectedClients();
void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
IReadOnlyList<ConnectedMqttClient> GetConnectedClients();
void Publish(MqttApplicationMessage applicationMessage); void Publish(MqttApplicationMessage applicationMessage);


Task StartAsync(); Task StartAsync();


+ 12
- 14
MQTTnet.Core/Server/MqttClientSession.cs View File

@@ -19,8 +19,7 @@ namespace MQTTnet.Core.Server
private readonly MqttClientSessionsManager _mqttClientSessionsManager; private readonly MqttClientSessionsManager _mqttClientSessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue; private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options; private readonly MqttServerOptions _options;

private string _identifier;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage; private MqttApplicationMessage _willMessage;


@@ -38,7 +37,7 @@ namespace MQTTnet.Core.Server


public IMqttCommunicationAdapter Adapter { get; private set; } public IMqttCommunicationAdapter Adapter { get; private set; }


public async Task RunAsync(string identifier, MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
{ {
if (adapter == null) throw new ArgumentNullException(nameof(adapter)); if (adapter == null) throw new ArgumentNullException(nameof(adapter));


@@ -46,7 +45,6 @@ namespace MQTTnet.Core.Server


try try
{ {
_identifier = identifier;
Adapter = adapter; Adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();


@@ -58,11 +56,11 @@ namespace MQTTnet.Core.Server
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier);
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
} }
} }


@@ -79,7 +77,7 @@ namespace MQTTnet.Core.Server


Adapter = null; Adapter = null;


MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", _identifier);
MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
} }


public void EnqueuePublishPacket(MqttPublishPacket publishPacket) public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
@@ -92,7 +90,7 @@ namespace MQTTnet.Core.Server
} }


_pendingMessagesQueue.Enqueue(publishPacket); _pendingMessagesQueue.Enqueue(publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier);
MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId);
} }


public void Dispose() public void Dispose()
@@ -116,12 +114,12 @@ namespace MQTTnet.Core.Server
} }
catch (MqttCommunicationException exception) catch (MqttCommunicationException exception)
{ {
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier);
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
Stop(); Stop();
} }
catch (Exception exception) catch (Exception exception)
{ {
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
Stop(); Stop();
} }
} }
@@ -159,12 +157,12 @@ namespace MQTTnet.Core.Server
} }
else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket) else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{ {
_cancellationTokenSource.Cancel();
Stop();
} }
else else
{ {
MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet);
_cancellationTokenSource.Cancel();
MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
Stop();
} }
} }


@@ -181,7 +179,7 @@ namespace MQTTnet.Core.Server
{ {
if (publishPacket.Retain) if (publishPacket.Retain)
{ {
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(_identifier, publishPacket);
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket);
} }


if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)


+ 29
- 9
MQTTnet.Core/Server/MqttClientSessionsManager.cs View File

@@ -25,16 +25,18 @@ namespace MQTTnet.Core.Server


public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived; public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;


public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected; public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;


public MqttClientRetainedMessagesManager RetainedMessagesManager { get; } public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }


public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter)
{ {
var clientId = string.Empty; var clientId = string.Empty;
try try
{ {
if (!(await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
{ {
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1]."); throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
} }
@@ -42,12 +44,12 @@ namespace MQTTnet.Core.Server
clientId = connectPacket.ClientId; clientId = connectPacket.ClientId;


// Switch to the required protocol version before sending any response. // Switch to the required protocol version before sending any response.
eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
var connectReturnCode = ValidateConnection(connectPacket); var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted) if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{ {
await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode ConnectReturnCode = connectReturnCode
}).ConfigureAwait(false); }).ConfigureAwait(false);
@@ -57,13 +59,19 @@ namespace MQTTnet.Core.Server


var clientSession = GetOrCreateClientSession(connectPacket); var clientSession = GetOrCreateClientSession(connectPacket);


await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{ {
ConnectReturnCode = connectReturnCode, ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession IsSessionPresent = clientSession.IsExistingSession
}).ConfigureAwait(false); }).ConfigureAwait(false);


await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false);
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}));

await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
} }
catch (Exception exception) catch (Exception exception)
{ {
@@ -71,8 +79,20 @@ namespace MQTTnet.Core.Server
} }
finally finally
{ {
await eventArgs.ClientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, eventArgs.ClientAdapter));
try
{
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
}
catch (Exception)
{
//ignored
}

ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}));
} }
} }


@@ -84,7 +104,7 @@ namespace MQTTnet.Core.Server
} }
} }


public IList<ConnectedMqttClient> GetConnectedClients()
public IReadOnlyList<ConnectedMqttClient> GetConnectedClients()
{ {
lock (_clientSessions) lock (_clientSessions)
{ {


+ 11
- 15
MQTTnet.Core/Server/MqttServer.cs View File

@@ -23,10 +23,11 @@ namespace MQTTnet.Core.Server


_clientSessionsManager = new MqttClientSessionsManager(options); _clientSessionsManager = new MqttClientSessionsManager(options);
_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e); _clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
_clientSessionsManager.ClientConnected += OnClientConnected;
_clientSessionsManager.ClientDisconnected += OnClientDisconnected; _clientSessionsManager.ClientDisconnected += OnClientDisconnected;
} }


public IList<ConnectedMqttClient> GetConnectedClients()
public IReadOnlyList<ConnectedMqttClient> GetConnectedClients()
{ {
return _clientSessionsManager.GetConnectedClients(); return _clientSessionsManager.GetConnectedClients();
} }
@@ -42,14 +43,6 @@ namespace MQTTnet.Core.Server
_clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket()); _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket());
} }


public void InjectClient(string identifier, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started.");

OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter));
}

public async Task StartAsync() public async Task StartAsync()
{ {
if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started."); if (_cancellationTokenSource != null) throw new InvalidOperationException("The MQTT server is already started.");
@@ -60,7 +53,7 @@ namespace MQTTnet.Core.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientConnected += OnClientConnected;
adapter.ClientAccepted += OnClientAccepted;
await adapter.StartAsync(_options); await adapter.StartAsync(_options);
} }


@@ -75,7 +68,7 @@ namespace MQTTnet.Core.Server


foreach (var adapter in _adapters) foreach (var adapter in _adapters)
{ {
adapter.ClientConnected -= OnClientConnected;
adapter.ClientAccepted -= OnClientAccepted;
await adapter.StopAsync(); await adapter.StopAsync();
} }


@@ -84,17 +77,20 @@ namespace MQTTnet.Core.Server
MqttTrace.Information(nameof(MqttServer), "Stopped."); MqttTrace.Information(nameof(MqttServer), "Stopped.");
} }


private void OnClientAccepted(IMqttCommunicationAdapter adapter)
{
Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(adapter), _cancellationTokenSource.Token);
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs) private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{ {
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier);
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId);
ClientConnected?.Invoke(this, eventArgs); ClientConnected?.Invoke(this, eventArgs);

Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token);
} }


private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs) private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
{ {
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Identifier);
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId);
ClientDisconnected?.Invoke(this, eventArgs); ClientDisconnected?.Invoke(this, eventArgs);
} }
} }


+ 8
- 8
MQTTnet.sln View File

@@ -50,18 +50,18 @@ Global
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|x64
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|x64
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|x86
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|x86
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU {A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|x64
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|x64
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|x86
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|x86
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU
{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.Build.0 = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.ActiveCfg = Debug|Any CPU {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}.Debug|ARM.ActiveCfg = Debug|Any CPU


+ 13
- 108
Tests/MQTTnet.Core.Tests/MQTTnet.Core.Tests.csproj View File

@@ -1,115 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup> <PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>MQTTnet.Core.Tests</RootNamespace>
<AssemblyName>MQTTnet.Core.Tests</AssemblyName>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
<OutputType>Exe</OutputType>
<DebugType>Full</DebugType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup> </PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Any CPU|AnyCPU'">
<OutputPath>bin\Any CPU\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<PlatformTarget>AnyCPU</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x64'">
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<DebugType>full</DebugType>
<PlatformTarget>x64</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x64'">
<OutputPath>bin\x64\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<PlatformTarget>x64</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Any CPU|x64'">
<OutputPath>bin\x64\Any CPU\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<PlatformTarget>x64</PlatformTarget>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisRuleSet>MinimumRecommendedRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x86'">
<PlatformTarget>x86</PlatformTarget>
<OutputPath>bin\x86\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x86'">
<PlatformTarget>x86</PlatformTarget>
<OutputPath>bin\x86\Release\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Any CPU|x86'">
<PlatformTarget>x86</PlatformTarget>
<OutputPath>bin\x86\Any CPU\</OutputPath>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
<Reference Include="System" />
<Reference Include="System.Core" />
</ItemGroup>
<ItemGroup>
<Compile Include="ByteReaderTests.cs" />
<Compile Include="ByteWriterTests.cs" />
<Compile Include="ExtensionTests.cs" />
<Compile Include="MqttPacketSerializerTests.cs" />
<Compile Include="MqttServerTests.cs" />
<Compile Include="MqttSubscriptionsManagerTests.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestMqttCommunicationAdapter.cs" />
<Compile Include="TestMqttServerAdapter.cs" />
<Compile Include="TopicFilterComparerTests.cs" />
</ItemGroup>

<ItemGroup> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
<PackageReference Include="MSTest.TestAdapter" Version="1.1.18" />
<PackageReference Include="MSTest.TestFramework" Version="1.1.18" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
</ItemGroup> </ItemGroup>

<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj">
<Project>{2ecb99e4-72d0-4c23-99ba-93d511d3967d}</Project>
<Name>MQTTnet.Core</Name>
</ProjectReference>
<ProjectReference Include="..\..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" />
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->

</Project> </Project>

+ 25
- 33
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -6,6 +6,7 @@ using MQTTnet.Core.Client;
using MQTTnet.Core.Packets; using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol; using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using System;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
@@ -48,12 +49,13 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public async Task MqttServer_WillMessage() public async Task MqttServer_WillMessage()
{ {
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
s.StartAsync();
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter });
await s.StartAsync();


var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false); var willMessage = new MqttApplicationMessage("My/last/will", new byte[0], MqttQualityOfServiceLevel.AtMostOnce, false);
var c1 = ConnectTestClient("c1", null, s);
var c2 = ConnectTestClient("c2", willMessage, s);
var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2", willMessage);


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
@@ -63,7 +65,7 @@ namespace MQTTnet.Core.Tests


await Task.Delay(1000); await Task.Delay(1000);


s.StopAsync();
await s.StopAsync();


Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);
} }
@@ -71,11 +73,12 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public async Task MqttServer_Unsubscribe() public async Task MqttServer_Unsubscribe()
{ {
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
s.StartAsync();
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter });
await s.StartAsync();


var c1 = ConnectTestClient("c1", null, s);
var c2 = ConnectTestClient("c2", null, s);
var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2");


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
@@ -97,7 +100,7 @@ namespace MQTTnet.Core.Tests
await Task.Delay(500); await Task.Delay(500);
Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);


s.StopAsync();
await s.StopAsync();
await Task.Delay(500); await Task.Delay(500);


Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);
@@ -106,10 +109,11 @@ namespace MQTTnet.Core.Tests
[TestMethod] [TestMethod]
public async Task MqttServer_Publish() public async Task MqttServer_Publish()
{ {
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
s.StartAsync();
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter });
await s.StartAsync();


var c1 = ConnectTestClient("c1", null, s);
var c1 = await serverAdapter.ConnectTestClient(s, "c1");


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
@@ -120,23 +124,10 @@ namespace MQTTnet.Core.Tests
s.Publish(message); s.Publish(message);
await Task.Delay(500); await Task.Delay(500);


s.StopAsync();
await s.StopAsync();


Assert.AreEqual(1, receivedMessagesCount); Assert.AreEqual(1, receivedMessagesCount);
}

private static MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server)
{
var adapterA = new TestMqttCommunicationAdapter();
var adapterB = new TestMqttCommunicationAdapter();
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;

var client = new MqttClient(new MqttClientOptions(), adapterA);
server.InjectClient(clientId, adapterB);
client.ConnectAsync(willMessage).Wait();
return client;
}
}


private async Task TestPublishAsync( private async Task TestPublishAsync(
string topic, string topic,
@@ -145,11 +136,12 @@ namespace MQTTnet.Core.Tests
MqttQualityOfServiceLevel filterQualityOfServiceLevel, MqttQualityOfServiceLevel filterQualityOfServiceLevel,
int expectedReceivedMessagesCount) int expectedReceivedMessagesCount)
{ {
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { new TestMqttServerAdapter() });
s.StartAsync();
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttServer(new MqttServerOptions(), new List<IMqttServerAdapter> { serverAdapter });
await s.StartAsync();


var c1 = ConnectTestClient("c1", null, s);
var c2 = ConnectTestClient("c2", null, s);
var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2");


var receivedMessagesCount = 0; var receivedMessagesCount = 0;
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
@@ -162,7 +154,7 @@ namespace MQTTnet.Core.Tests


await Task.Delay(500); await Task.Delay(500);


s.StopAsync();
await s.StopAsync();


Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount); Assert.AreEqual(expectedReceivedMessagesCount, receivedMessagesCount);
} }


+ 0
- 15
Tests/MQTTnet.Core.Tests/Properties/AssemblyInfo.cs View File

@@ -1,15 +0,0 @@
using System.Reflection;
using System.Runtime.InteropServices;

[assembly: AssemblyTitle("MQTTnet.Core.Tests")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Christian Kratky")]
[assembly: AssemblyProduct("MQTTnet")]
[assembly: AssemblyCopyright("Copyright © Christian Kratky 2016-2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
[assembly: Guid("a7ff0c91-25de-4ba6-b39e-f54e8dadf1cc")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

+ 39
- 3
Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs View File

@@ -2,16 +2,52 @@
using System.Threading.Tasks; using System.Threading.Tasks;
using MQTTnet.Core.Adapter; using MQTTnet.Core.Adapter;
using MQTTnet.Core.Server; using MQTTnet.Core.Server;
using MQTTnet.Core.Client;


namespace MQTTnet.Core.Tests namespace MQTTnet.Core.Tests
{ {
public class TestMqttServerAdapter : IMqttServerAdapter public class TestMqttServerAdapter : IMqttServerAdapter
{ {
public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;


public void FireClientConnectedEvent(MqttClientConnectedEventArgs eventArgs)
public async Task<MqttClient> ConnectTestClient(IMqttServer server, string clientId, MqttApplicationMessage willMessage = null)
{ {
ClientConnected?.Invoke(this, eventArgs);
var adapterA = new TestMqttCommunicationAdapter();
var adapterB = new TestMqttCommunicationAdapter();
adapterA.Partner = adapterB;
adapterB.Partner = adapterA;

var client = new MqttClient(new MqttClientOptions() { ClientId = clientId }, adapterA);
var connected = WaitForClientToConnect(server, clientId);
FireClientAcceptedEvent(adapterB);
await client.ConnectAsync(willMessage);
await connected;

return client;
}
private static Task WaitForClientToConnect(IMqttServer s, string clientId)
{
var tcs = new TaskCompletionSource<object>();

EventHandler<MqttClientConnectedEventArgs> handler = null;
handler = (sender, args) =>
{
if (args.Client.ClientId == clientId)
{
s.ClientConnected -= handler;
tcs.SetResult(null);
}
};

s.ClientConnected += handler;

return tcs.Task;
}

private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter)
{
ClientAccepted?.Invoke(adapter);
} }


public Task StartAsync(MqttServerOptions options) public Task StartAsync(MqttServerOptions options)


Loading…
Cancel
Save