Browse Source

Clean up

release/3.x.x
Lucas Rosa 5 years ago
parent
commit
5c25905510
6 changed files with 4 additions and 101 deletions
  1. +0
    -10
      Source/MQTTnet/Server/MqttClientConnection.cs
  2. +0
    -58
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  3. +0
    -7
      Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs
  4. +0
    -1
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  5. +2
    -2
      Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs
  6. +2
    -23
      Tests/MQTTnet.Core.Tests/Server_Tests.cs

+ 0
- 10
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -52,7 +52,6 @@ namespace MQTTnet.Server
MqttRetainedMessagesManager retainedMessagesManager,
IMqttNetChildLogger logger)
{
TestLogger.WriteLine($"MqttClientConnection init");
Session = session ?? throw new ArgumentNullException(nameof(session));
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager));
@@ -81,7 +80,6 @@ namespace MQTTnet.Server
public async Task StopAsync()
{
TestLogger.WriteLine($"MqttClientConnection stop");
StopInternal();

var task = _packageReceiverTask;
@@ -156,14 +154,12 @@ namespace MQTTnet.Server

while (!_cancellationToken.IsCancellationRequested)
{
TestLogger.WriteLine($"MqttClientConnection while");
var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationToken.Token).ConfigureAwait(false);
if (packet == null)
{
// The client has closed the connection gracefully.
break;
}
TestLogger.WriteLine($"MqttClientConnection pack");

Interlocked.Increment(ref _sentPacketsCount);
_lastPacketReceivedTimestamp = DateTime.UtcNow;
@@ -262,13 +258,11 @@ namespace MQTTnet.Server

private void StopInternal()
{
TestLogger.WriteLine($"MqttClientConnection stop int");
_cancellationToken.Cancel(false);
}

private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection<TopicFilter> topicFilters)
{
TestLogger.WriteLine($"MqttClientConnection retainedmessages");
var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false);
foreach (var applicationMessage in retainedMessages)
{
@@ -278,7 +272,6 @@ namespace MQTTnet.Server

private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket)
{
TestLogger.WriteLine($"MqttClientConnection subpacket");
// TODO: Let the channel adapter create the packet.
var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false);

@@ -302,7 +295,6 @@ namespace MQTTnet.Server

private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
{
TestLogger.WriteLine($"MqttClientConnection pub");
Interlocked.Increment(ref _sentApplicationMessagesCount);

switch (publishPacket.QualityOfServiceLevel)
@@ -360,7 +352,6 @@ namespace MQTTnet.Server

private async Task SendPendingPacketsAsync(CancellationToken cancellationToken)
{
TestLogger.WriteLine($"MqttClientConnection send");
MqttQueuedApplicationMessage queuedApplicationMessage = null;
MqttPublishPacket publishPacket = null;

@@ -476,7 +467,6 @@ namespace MQTTnet.Server

private async Task SendAsync(MqttBasePacket packet)
{
TestLogger.WriteLine($"MqttClientConnection send");
await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false);

Interlocked.Increment(ref _receivedPacketsCount);


+ 0
- 58
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -30,32 +30,6 @@ namespace MQTTnet.Server
private readonly IMqttServerOptions _options;
private readonly IMqttNetChildLogger _logger;

public static class TestLogger
{
private static object _lock = new object();
public static void WriteLine(string message)
{
lock (_lock)
{
var path = @"c:\temp\test1.txt";
FileStream logFile;
if (!System.IO.File.Exists(path))
logFile = System.IO.File.Create(path);
else
logFile = System.IO.File.Open(path, FileMode.Append);

using (var writer = new System.IO.StreamWriter(logFile))
{
writer.WriteLine($"{DateTime.Now} - {message}");
}

logFile.Dispose();
}

}
}


public MqttClientSessionsManager(
IMqttServerOptions options,
MqttRetainedMessagesManager retainedMessagesManager,
@@ -63,7 +37,6 @@ namespace MQTTnet.Server
MqttServerEventDispatcher eventDispatcher,
IMqttNetChildLogger logger)
{
TestLogger.WriteLine("Newly new");
_cancellationToken = cancellationToken;

if (logger == null) throw new ArgumentNullException(nameof(logger));
@@ -76,13 +49,11 @@ namespace MQTTnet.Server

public void Start()
{
TestLogger.WriteLine("Start");
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger);
}

public async Task StopAsync()
{
TestLogger.WriteLine("Stop");
foreach (var connection in _connections.Values)
{
await connection.StopAsync().ConfigureAwait(false);
@@ -96,7 +67,6 @@ namespace MQTTnet.Server

public Task<IList<IMqttClientStatus>> GetClientStatusAsync()
{
TestLogger.WriteLine("Status");
var result = new List<IMqttClientStatus>();

foreach (var connection in _connections.Values)
@@ -116,7 +86,6 @@ namespace MQTTnet.Server

public Task<IList<IMqttSessionStatus>> GetSessionStatusAsync()
{
TestLogger.WriteLine("Session");
var result = new List<IMqttSessionStatus>();

foreach (var session in _sessions.Values)
@@ -132,7 +101,6 @@ namespace MQTTnet.Server

public void DispatchApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender)
{
TestLogger.WriteLine("Message");
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

_messageQueue.Enqueue(new MqttEnqueuedApplicationMessage(applicationMessage, sender));
@@ -140,7 +108,6 @@ namespace MQTTnet.Server

public Task SubscribeAsync(string clientId, ICollection<TopicFilter> topicFilters)
{
TestLogger.WriteLine("sub");
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

@@ -154,7 +121,6 @@ namespace MQTTnet.Server

public Task UnsubscribeAsync(string clientId, IEnumerable<string> topicFilters)
{
TestLogger.WriteLine("unsub");
if (clientId == null) throw new ArgumentNullException(nameof(clientId));
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

@@ -168,7 +134,6 @@ namespace MQTTnet.Server

public async Task DeleteSessionAsync(string clientId)
{
TestLogger.WriteLine("Delete");
if (_connections.TryGetValue(clientId, out var connection))
{
await connection.StopAsync().ConfigureAwait(false);
@@ -183,13 +148,11 @@ namespace MQTTnet.Server

public void Dispose()
{
TestLogger.WriteLine("byebye");
_messageQueue?.Dispose();
}

private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken)
{
TestLogger.WriteLine("queue");
while (!cancellationToken.IsCancellationRequested)
{
try
@@ -208,7 +171,6 @@ namespace MQTTnet.Server

private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken)
{
TestLogger.WriteLine("pm process message");
try
{
if (cancellationToken.IsCancellationRequested)
@@ -217,7 +179,6 @@ namespace MQTTnet.Server
}

var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false);
TestLogger.WriteLine("pm dequeued");
var queuedApplicationMessage = dequeueResult.Item;

var sender = queuedApplicationMessage.Sender;
@@ -249,7 +210,6 @@ namespace MQTTnet.Server
await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false);
}

TestLogger.WriteLine($"pm sessions: {_sessions.Count}");
foreach (var clientSession in _sessions.Values)
{
clientSession.EnqueueApplicationMessage(
@@ -260,18 +220,15 @@ namespace MQTTnet.Server
}
catch (OperationCanceledException)
{
TestLogger.WriteLine($"pm no queue");
}
catch (Exception exception)
{
TestLogger.WriteLine($"pm no queue {exception}");
_logger.Error(exception, "Unhandled exception while processing next queued application message.");
}
}

private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken)
{
TestLogger.WriteLine($"handle");
var disconnectType = MqttClientDisconnectType.NotClean;
string clientId = null;

@@ -287,13 +244,11 @@ namespace MQTTnet.Server
}

clientId = connectPacket.ClientId;
TestLogger.WriteLine($"validating {clientId}");

var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false);

if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success)
{
TestLogger.WriteLine($"{clientId} not good");
ok = false;
// Send failure response here without preparing a session. The result for a successful connect
// will be sent from the session itself.
@@ -303,30 +258,24 @@ namespace MQTTnet.Server
return;
}

TestLogger.WriteLine($"{clientId} good");

var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false);

await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false);
disconnectType = await connection.RunAsync().ConfigureAwait(false);

TestLogger.WriteLine($"{clientId} all good");
}
catch (OperationCanceledException)
{
TestLogger.WriteLine($"no");
}
catch (Exception exception)
{
TestLogger.WriteLine($"no {exception}");
_logger.Error(exception, exception.Message);
}
finally
{
if (ok)
{
TestLogger.WriteLine($"finally {clientId}");
if (clientId != null)
{
_connections.TryRemove(clientId, out _);
@@ -349,7 +298,6 @@ namespace MQTTnet.Server

private async Task<MqttConnectionValidatorContext> ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter)
{
TestLogger.WriteLine("validate");
var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary<object, object>());

var connectionValidator = _options.ConnectionValidator;
@@ -378,7 +326,6 @@ namespace MQTTnet.Server

private async Task<MqttClientConnection> CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter)
{
TestLogger.WriteLine("cc create");
await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false);
try
{
@@ -407,7 +354,6 @@ namespace MQTTnet.Server
if (session == null)
{
session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _logger);
TestLogger.WriteLine("cc created");
_logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId);
}

@@ -415,8 +361,6 @@ namespace MQTTnet.Server

_connections[connection.ClientId] = connection;
_sessions[session.ClientId] = session;
TestLogger.WriteLine($"cc connections {_connections.Count}");
TestLogger.WriteLine($"cc sessions {_sessions.Count}");

return connection;
}
@@ -428,7 +372,6 @@ namespace MQTTnet.Server

private async Task<MqttApplicationMessageInterceptorContext> InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage)
{
TestLogger.WriteLine("intercept");
var interceptor = _options.ApplicationMessageInterceptor;
if (interceptor == null)
{
@@ -457,7 +400,6 @@ namespace MQTTnet.Server

private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter)
{
TestLogger.WriteLine("clean");
try
{
await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false);


+ 0
- 7
Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs View File

@@ -17,7 +17,6 @@ namespace MQTTnet.Server

public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions)
{
TestLogger.WriteLine("sub manager");
_clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession));

// TODO: Consider removing the server options here and build a new class "ISubscriptionInterceptor" and just pass it. The instance is generated in the root server class upon start.
@@ -27,7 +26,6 @@ namespace MQTTnet.Server

public async Task<MqttClientSubscribeResult> SubscribeAsync(MqttSubscribePacket subscribePacket, MqttConnectPacket connectPacket)
{
TestLogger.WriteLine("sub1");
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket));

@@ -79,7 +77,6 @@ namespace MQTTnet.Server

public async Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
{
TestLogger.WriteLine("sub2");
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

foreach (var topicFilter in topicFilters)
@@ -104,7 +101,6 @@ namespace MQTTnet.Server

public async Task<MqttUnsubAckPacket> UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket)
{
TestLogger.WriteLine("unsub1");
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));

var unsubAckPacket = new MqttUnsubAckPacket
@@ -137,7 +133,6 @@ namespace MQTTnet.Server

public Task UnsubscribeAsync(IEnumerable<string> topicFilters)
{
TestLogger.WriteLine("unsub2");
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));

lock (_subscriptions)
@@ -153,7 +148,6 @@ namespace MQTTnet.Server

public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel)
{
TestLogger.WriteLine("check subs");
var qosLevels = new HashSet<MqttQualityOfServiceLevel>();

lock (_subscriptions)
@@ -164,7 +158,6 @@ namespace MQTTnet.Server
{
continue;
}
TestLogger.WriteLine("is match");
qosLevels.Add(subscription.Value.QualityOfServiceLevel);
}
}


+ 0
- 1
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -56,7 +56,6 @@ namespace MQTTnet.Server

public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter)
{
TestLogger.WriteLine("handle sub");
var handler = ClientSubscribedTopicHandler;
if (handler == null)
{


+ 2
- 2
Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs View File

@@ -114,7 +114,7 @@ namespace MQTTnet.Tests.Mockups
{
if (!IgnoreServerLogErrors && _serverErrors.Count > 0)
{
//throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)}).");
throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)}).");
}
}

@@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups
{
if (!IgnoreClientLogErrors && _clientErrors.Count > 0)
{
//throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)}).");
throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)}).");
}
}
}


+ 2
- 23
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -936,6 +936,8 @@ namespace MQTTnet.Tests
{
using (var testEnvironment = new TestEnvironment())
{
testEnvironment.IgnoreClientLogErrors = true;

_connected = new Dictionary<string, bool>();
var options = new MqttServerOptionsBuilder();
options.WithConnectionValidator(e => ConnectionValidationHandler(e));
@@ -943,8 +945,6 @@ namespace MQTTnet.Tests

var events = new List<string>();

var connected = true;

server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ =>
{
lock (events)
@@ -972,15 +972,6 @@ namespace MQTTnet.Tests
lock (events)
{
events.Add("x");
connected = false;
}
});

c1.UseConnectedHandler(_ =>
{
lock (events)
{
connected = true;
}
});

@@ -1002,7 +993,6 @@ namespace MQTTnet.Tests

await Task.Delay(500);


var flow = string.Join(string.Empty, events);
Assert.AreEqual("cr", flow);

@@ -1018,16 +1008,6 @@ namespace MQTTnet.Tests

await Task.Delay(500);

/*if (!connected)
{
c1.ReconnectAsync().Wait();
}

if (!c1.IsConnected)
{
c1.ReconnectAsync().Wait();
}*/

flow = string.Join(string.Empty, events);
Assert.AreEqual("cr", flow);

@@ -1037,7 +1017,6 @@ namespace MQTTnet.Tests

flow = string.Join(string.Empty, events);
Assert.AreEqual("crr", flow);

}
}



Loading…
Cancel
Save