Explorar el Código

improved client livetime events

release/3.x.x
JanEggers hace 7 años
padre
commit
3079ad37ce
Se han modificado 10 ficheros con 71 adiciones y 53 borrados
  1. +3
    -3
      Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs
  2. +3
    -3
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  3. +2
    -2
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  4. +1
    -1
      MQTTnet.Core/Adapter/IMqttServerAdapter.cs
  5. +5
    -7
      MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs
  6. +6
    -8
      MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs
  7. +1
    -1
      MQTTnet.Core/Server/IMqttServer.cs
  8. +10
    -12
      MQTTnet.Core/Server/MqttClientSession.cs
  9. +28
    -8
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  10. +12
    -8
      MQTTnet.Core/Server/MqttServer.cs

+ 3
- 3
Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs Ver fichero

@@ -22,7 +22,7 @@ namespace MQTTnet.Implementations

private bool _isRunning;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;

public Task StartAsync(MqttServerOptions options)
{
@@ -93,7 +93,7 @@ namespace MQTTnet.Implementations

var tcpChannel = new MqttTcpChannel(clientSocket, null);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
}
catch (Exception exception) when (!(exception is ObjectDisposedException))
{
@@ -118,7 +118,7 @@ namespace MQTTnet.Implementations

var tcpChannel = new MqttTcpChannel(clientSocket, sslStream);
var clientAdapter = new MqttChannelCommunicationAdapter(tcpChannel, new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
}
catch (Exception exception)
{


+ 3
- 3
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs Ver fichero

@@ -22,7 +22,7 @@ namespace MQTTnet.Implementations

private bool _isRunning;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;

public Task StartAsync(MqttServerOptions options)
{
@@ -90,7 +90,7 @@ namespace MQTTnet.Implementations
{
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
}
catch (Exception exception)
{
@@ -114,7 +114,7 @@ namespace MQTTnet.Implementations
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);

var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
}
catch (Exception exception)
{


+ 2
- 2
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs Ver fichero

@@ -14,7 +14,7 @@ namespace MQTTnet.Implementations

private bool _isRunning;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
public event Action<IMqttCommunicationAdapter> ClientAccepted;

public async Task StartAsync(MqttServerOptions options)
{
@@ -57,7 +57,7 @@ namespace MQTTnet.Implementations
try
{
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter));
ClientAccepted?.Invoke(clientAdapter);
}
catch (Exception exception)
{


+ 1
- 1
MQTTnet.Core/Adapter/IMqttServerAdapter.cs Ver fichero

@@ -6,7 +6,7 @@ namespace MQTTnet.Core.Adapter
{
public interface IMqttServerAdapter
{
event EventHandler<MqttClientConnectedEventArgs> ClientConnected;
event Action<IMqttCommunicationAdapter> ClientAccepted;

Task StartAsync(MqttServerOptions options);
Task StopAsync();


+ 5
- 7
MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs Ver fichero

@@ -1,17 +1,15 @@
using System;
using MQTTnet.Core.Server;
using System;

namespace MQTTnet.Core.Adapter
{
public class MqttClientConnectedEventArgs : EventArgs
{
public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter)
public MqttClientConnectedEventArgs(ConnectedMqttClient client)
{
Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier));
ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
Client = client ?? throw new ArgumentNullException(nameof(client));
}

public string Identifier { get; }

public IMqttCommunicationAdapter ClientAdapter { get; }
public ConnectedMqttClient Client { get; }
}
}

+ 6
- 8
MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs Ver fichero

@@ -1,17 +1,15 @@
using System;
using MQTTnet.Core.Server;
using System;

namespace MQTTnet.Core.Adapter
{
public class MqttClientDisconnectedEventArgs : EventArgs
{
public MqttClientDisconnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter)
public MqttClientDisconnectedEventArgs(ConnectedMqttClient client)
{
Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier));
ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter));
Client = client ?? throw new ArgumentNullException(nameof(client));
}

public string Identifier { get; }

public IMqttCommunicationAdapter ClientAdapter { get; }
public ConnectedMqttClient Client { get; }
}
}

+ 1
- 1
MQTTnet.Core/Server/IMqttServer.cs Ver fichero

@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Server
event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;

IList<ConnectedMqttClient> GetConnectedClients();
void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
void InjectClient(IMqttCommunicationAdapter adapter);
void Publish(MqttApplicationMessage applicationMessage);

Task StartAsync();


+ 10
- 12
MQTTnet.Core/Server/MqttClientSession.cs Ver fichero

@@ -19,8 +19,7 @@ namespace MQTTnet.Core.Server
private readonly MqttClientSessionsManager _mqttClientSessionsManager;
private readonly MqttClientPendingMessagesQueue _pendingMessagesQueue;
private readonly MqttServerOptions _options;

private string _identifier;
private CancellationTokenSource _cancellationTokenSource;
private MqttApplicationMessage _willMessage;

@@ -38,7 +37,7 @@ namespace MQTTnet.Core.Server

public IMqttCommunicationAdapter Adapter { get; private set; }

public async Task RunAsync(string identifier, MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
public async Task RunAsync(MqttApplicationMessage willMessage, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));

@@ -46,7 +45,6 @@ namespace MQTTnet.Core.Server

try
{
_identifier = identifier;
Adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource();

@@ -58,11 +56,11 @@ namespace MQTTnet.Core.Server
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier);
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
}
}

@@ -79,7 +77,7 @@ namespace MQTTnet.Core.Server

Adapter = null;

MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", _identifier);
MqttTrace.Information(nameof(MqttClientSession), "Client '{0}': Disconnected.", ClientId);
}

public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
@@ -92,7 +90,7 @@ namespace MQTTnet.Core.Server
}

_pendingMessagesQueue.Enqueue(publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", _identifier);
MqttTrace.Verbose(nameof(MqttClientSession), "Client '{0}': Enqueued pending publish packet.", ClientId);
}

public void Dispose()
@@ -116,12 +114,12 @@ namespace MQTTnet.Core.Server
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", _identifier);
MqttTrace.Warning(nameof(MqttClientSession), exception, "Client '{0}': Communication exception while processing client packets.", ClientId);
Stop();
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", _identifier);
MqttTrace.Error(nameof(MqttClientSession), exception, "Client '{0}': Unhandled exception while processing client packets.", ClientId);
Stop();
}
}
@@ -163,7 +161,7 @@ namespace MQTTnet.Core.Server
}
else
{
MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", _identifier, packet);
MqttTrace.Warning(nameof(MqttClientSession), "Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
_cancellationTokenSource.Cancel();
}
}
@@ -181,7 +179,7 @@ namespace MQTTnet.Core.Server
{
if (publishPacket.Retain)
{
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(_identifier, publishPacket);
await _mqttClientSessionsManager.RetainedMessagesManager.HandleMessageAsync(ClientId, publishPacket);
}

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)


+ 28
- 8
MQTTnet.Core/Server/MqttClientSessionsManager.cs Ver fichero

@@ -25,16 +25,18 @@ namespace MQTTnet.Core.Server

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

public event EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected;

public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }

public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter)
{
var clientId = string.Empty;
try
{
if (!(await eventArgs.ClientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
{
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
}
@@ -42,12 +44,12 @@ namespace MQTTnet.Core.Server
clientId = connectPacket.ClientId;

// Switch to the required protocol version before sending any response.
eventArgs.ClientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
clientAdapter.PacketSerializer.ProtocolVersion = connectPacket.ProtocolVersion;
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}).ConfigureAwait(false);
@@ -57,13 +59,19 @@ namespace MQTTnet.Core.Server

var clientSession = GetOrCreateClientSession(connectPacket);

await eventArgs.ClientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession
}).ConfigureAwait(false);

await clientSession.Session.RunAsync(eventArgs.Identifier, connectPacket.WillMessage, eventArgs.ClientAdapter).ConfigureAwait(false);
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}));

await clientSession.Session.RunAsync(connectPacket.WillMessage, clientAdapter).ConfigureAwait(false);
}
catch (Exception exception)
{
@@ -71,8 +79,20 @@ namespace MQTTnet.Core.Server
}
finally
{
await eventArgs.ClientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientId, eventArgs.ClientAdapter));
try
{
await clientAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout).ConfigureAwait(false);
}
catch (Exception)
{
//ignored
}

ClientDisconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(new ConnectedMqttClient
{
ClientId = clientId,
ProtocolVersion = clientAdapter.PacketSerializer.ProtocolVersion
}));
}
}



+ 12
- 8
MQTTnet.Core/Server/MqttServer.cs Ver fichero

@@ -23,6 +23,7 @@ namespace MQTTnet.Core.Server

_clientSessionsManager = new MqttClientSessionsManager(options);
_clientSessionsManager.ApplicationMessageReceived += (s, e) => ApplicationMessageReceived?.Invoke(s, e);
_clientSessionsManager.ClientConnected += OnClientConnected;
_clientSessionsManager.ClientDisconnected += OnClientDisconnected;
}

@@ -42,12 +43,12 @@ namespace MQTTnet.Core.Server
_clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket());
}

public void InjectClient(string identifier, IMqttCommunicationAdapter adapter)
public void InjectClient(IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
if (_cancellationTokenSource == null) throw new InvalidOperationException("The MQTT server is not started.");

OnClientConnected(this, new MqttClientConnectedEventArgs(identifier, adapter));
OnClientAccepted(adapter);
}

public async Task StartAsync()
@@ -60,7 +61,7 @@ namespace MQTTnet.Core.Server

foreach (var adapter in _adapters)
{
adapter.ClientConnected += OnClientConnected;
adapter.ClientAccepted += OnClientAccepted;
await adapter.StartAsync(_options);
}

@@ -75,7 +76,7 @@ namespace MQTTnet.Core.Server

foreach (var adapter in _adapters)
{
adapter.ClientConnected -= OnClientConnected;
adapter.ClientAccepted -= OnClientAccepted;
await adapter.StopAsync();
}

@@ -84,17 +85,20 @@ namespace MQTTnet.Core.Server
MqttTrace.Information(nameof(MqttServer), "Stopped.");
}

private void OnClientAccepted(IMqttCommunicationAdapter adapter)
{
Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(adapter), _cancellationTokenSource.Token);
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)
{
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Identifier);
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Connected.", eventArgs.Client.ClientId);
ClientConnected?.Invoke(this, eventArgs);

Task.Run(() => _clientSessionsManager.RunClientSessionAsync(eventArgs), _cancellationTokenSource.Token);
}

private void OnClientDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
{
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Identifier);
MqttTrace.Information(nameof(MqttServer), "Client '{0}': Disconnected.", eventArgs.Client.ClientId);
ClientDisconnected?.Invoke(this, eventArgs);
}
}


Cargando…
Cancelar
Guardar