Browse Source

Improve connection handling.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
5c903e6fbf
2 changed files with 12 additions and 18 deletions
  1. +10
    -8
      Source/MQTTnet/Implementations/MqttTcpServerListener.cs
  2. +2
    -10
      Source/MQTTnet/Server/MqttClientSessionsManager.cs

+ 10
- 8
Source/MQTTnet/Implementations/MqttTcpServerListener.cs View File

@@ -82,7 +82,9 @@ namespace MQTTnet.Implementations
#else
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
#endif
await HandleClientConnection(clientSocket).ConfigureAwait(false);
#pragma warning disable 4014
Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken);
#pragma warning restore 4014
}
catch (Exception exception)
{
@@ -92,12 +94,17 @@ namespace MQTTnet.Implementations
}
}

private async Task HandleClientConnection(Socket clientSocket)
private async Task TryHandleClientConnectionAsync(Socket clientSocket)
{
Stream stream = null;

try
{
_logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.",
clientSocket.RemoteEndPoint,
_socket.LocalEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");

clientSocket.NoDelay = _options.NoDelay;

stream = new NetworkStream(clientSocket, true);
@@ -109,11 +116,6 @@ namespace MQTTnet.Implementations
stream = sslStream;
}

_logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.",
clientSocket.RemoteEndPoint,
_socket.LocalEndPoint,
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6");

var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(stream), new MqttPacketFormatterAdapter(), _logger);
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
}
@@ -138,7 +140,7 @@ namespace MQTTnet.Implementations
}
catch (Exception disposeException)
{
throw new AggregateException(exception, disposeException);
_logger.Error(disposeException, "Error while cleanup of broken connection.");
}
}
}


+ 2
- 10
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -274,17 +274,9 @@ namespace MQTTnet.Server
{
_connections.TryRemove(clientId, out _);

////connection?.ReferenceCounter.Decrement();
////if (connection?.ReferenceCounter.HasReferences == true)
////{
//// disconnectType = MqttClientDisconnectType.Takeover;
////}
////else
if (!_options.EnablePersistentSessions)
{
if (!_options.EnablePersistentSessions)
{
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}
await DeleteSessionAsync(clientId).ConfigureAwait(false);
}

await TryCleanupChannelAsync(channelAdapter).ConfigureAwait(false);


Loading…
Cancel
Save