From 5c25905510d2da368df6bd7626ede57057b0d9f5 Mon Sep 17 00:00:00 2001 From: Lucas Rosa Date: Tue, 29 Oct 2019 12:48:04 -0300 Subject: [PATCH] Clean up --- Source/MQTTnet/Server/MqttClientConnection.cs | 10 ---- .../Server/MqttClientSessionsManager.cs | 58 ------------------- .../Server/MqttClientSubscriptionsManager.cs | 7 --- .../Server/MqttServerEventDispatcher.cs | 1 - .../Mockups/TestEnvironment.cs | 4 +- Tests/MQTTnet.Core.Tests/Server_Tests.cs | 25 +------- 6 files changed, 4 insertions(+), 101 deletions(-) diff --git a/Source/MQTTnet/Server/MqttClientConnection.cs b/Source/MQTTnet/Server/MqttClientConnection.cs index 677f920..a114e28 100644 --- a/Source/MQTTnet/Server/MqttClientConnection.cs +++ b/Source/MQTTnet/Server/MqttClientConnection.cs @@ -52,7 +52,6 @@ namespace MQTTnet.Server MqttRetainedMessagesManager retainedMessagesManager, IMqttNetChildLogger logger) { - TestLogger.WriteLine($"MqttClientConnection init"); Session = session ?? throw new ArgumentNullException(nameof(session)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _sessionsManager = sessionsManager ?? throw new ArgumentNullException(nameof(sessionsManager)); @@ -81,7 +80,6 @@ namespace MQTTnet.Server public async Task StopAsync() { - TestLogger.WriteLine($"MqttClientConnection stop"); StopInternal(); var task = _packageReceiverTask; @@ -156,14 +154,12 @@ namespace MQTTnet.Server while (!_cancellationToken.IsCancellationRequested) { - TestLogger.WriteLine($"MqttClientConnection while"); var packet = await _channelAdapter.ReceivePacketAsync(TimeSpan.Zero, _cancellationToken.Token).ConfigureAwait(false); if (packet == null) { // The client has closed the connection gracefully. break; } - TestLogger.WriteLine($"MqttClientConnection pack"); Interlocked.Increment(ref _sentPacketsCount); _lastPacketReceivedTimestamp = DateTime.UtcNow; @@ -262,13 +258,11 @@ namespace MQTTnet.Server private void StopInternal() { - TestLogger.WriteLine($"MqttClientConnection stop int"); _cancellationToken.Cancel(false); } private async Task EnqueueSubscribedRetainedMessagesAsync(ICollection topicFilters) { - TestLogger.WriteLine($"MqttClientConnection retainedmessages"); var retainedMessages = await _retainedMessagesManager.GetSubscribedMessagesAsync(topicFilters).ConfigureAwait(false); foreach (var applicationMessage in retainedMessages) { @@ -278,7 +272,6 @@ namespace MQTTnet.Server private async Task HandleIncomingSubscribePacketAsync(MqttSubscribePacket subscribePacket) { - TestLogger.WriteLine($"MqttClientConnection subpacket"); // TODO: Let the channel adapter create the packet. var subscribeResult = await Session.SubscriptionsManager.SubscribeAsync(subscribePacket, ConnectPacket).ConfigureAwait(false); @@ -302,7 +295,6 @@ namespace MQTTnet.Server private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket) { - TestLogger.WriteLine($"MqttClientConnection pub"); Interlocked.Increment(ref _sentApplicationMessagesCount); switch (publishPacket.QualityOfServiceLevel) @@ -360,7 +352,6 @@ namespace MQTTnet.Server private async Task SendPendingPacketsAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine($"MqttClientConnection send"); MqttQueuedApplicationMessage queuedApplicationMessage = null; MqttPublishPacket publishPacket = null; @@ -476,7 +467,6 @@ namespace MQTTnet.Server private async Task SendAsync(MqttBasePacket packet) { - TestLogger.WriteLine($"MqttClientConnection send"); await _channelAdapter.SendPacketAsync(packet, _serverOptions.DefaultCommunicationTimeout, _cancellationToken.Token).ConfigureAwait(false); Interlocked.Increment(ref _receivedPacketsCount); diff --git a/Source/MQTTnet/Server/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/MqttClientSessionsManager.cs index 61a3bf2..5a27b46 100644 --- a/Source/MQTTnet/Server/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSessionsManager.cs @@ -30,32 +30,6 @@ namespace MQTTnet.Server private readonly IMqttServerOptions _options; private readonly IMqttNetChildLogger _logger; - public static class TestLogger - { - private static object _lock = new object(); - public static void WriteLine(string message) - { - lock (_lock) - { - var path = @"c:\temp\test1.txt"; - FileStream logFile; - if (!System.IO.File.Exists(path)) - logFile = System.IO.File.Create(path); - else - logFile = System.IO.File.Open(path, FileMode.Append); - - using (var writer = new System.IO.StreamWriter(logFile)) - { - writer.WriteLine($"{DateTime.Now} - {message}"); - } - - logFile.Dispose(); - } - - } - } - - public MqttClientSessionsManager( IMqttServerOptions options, MqttRetainedMessagesManager retainedMessagesManager, @@ -63,7 +37,6 @@ namespace MQTTnet.Server MqttServerEventDispatcher eventDispatcher, IMqttNetChildLogger logger) { - TestLogger.WriteLine("Newly new"); _cancellationToken = cancellationToken; if (logger == null) throw new ArgumentNullException(nameof(logger)); @@ -76,13 +49,11 @@ namespace MQTTnet.Server public void Start() { - TestLogger.WriteLine("Start"); Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger); } public async Task StopAsync() { - TestLogger.WriteLine("Stop"); foreach (var connection in _connections.Values) { await connection.StopAsync().ConfigureAwait(false); @@ -96,7 +67,6 @@ namespace MQTTnet.Server public Task> GetClientStatusAsync() { - TestLogger.WriteLine("Status"); var result = new List(); foreach (var connection in _connections.Values) @@ -116,7 +86,6 @@ namespace MQTTnet.Server public Task> GetSessionStatusAsync() { - TestLogger.WriteLine("Session"); var result = new List(); foreach (var session in _sessions.Values) @@ -132,7 +101,6 @@ namespace MQTTnet.Server public void DispatchApplicationMessage(MqttApplicationMessage applicationMessage, MqttClientConnection sender) { - TestLogger.WriteLine("Message"); if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); _messageQueue.Enqueue(new MqttEnqueuedApplicationMessage(applicationMessage, sender)); @@ -140,7 +108,6 @@ namespace MQTTnet.Server public Task SubscribeAsync(string clientId, ICollection topicFilters) { - TestLogger.WriteLine("sub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -154,7 +121,6 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(string clientId, IEnumerable topicFilters) { - TestLogger.WriteLine("unsub"); if (clientId == null) throw new ArgumentNullException(nameof(clientId)); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -168,7 +134,6 @@ namespace MQTTnet.Server public async Task DeleteSessionAsync(string clientId) { - TestLogger.WriteLine("Delete"); if (_connections.TryGetValue(clientId, out var connection)) { await connection.StopAsync().ConfigureAwait(false); @@ -183,13 +148,11 @@ namespace MQTTnet.Server public void Dispose() { - TestLogger.WriteLine("byebye"); _messageQueue?.Dispose(); } private async Task TryProcessQueuedApplicationMessagesAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine("queue"); while (!cancellationToken.IsCancellationRequested) { try @@ -208,7 +171,6 @@ namespace MQTTnet.Server private async Task TryProcessNextQueuedApplicationMessageAsync(CancellationToken cancellationToken) { - TestLogger.WriteLine("pm process message"); try { if (cancellationToken.IsCancellationRequested) @@ -217,7 +179,6 @@ namespace MQTTnet.Server } var dequeueResult = await _messageQueue.TryDequeueAsync(cancellationToken).ConfigureAwait(false); - TestLogger.WriteLine("pm dequeued"); var queuedApplicationMessage = dequeueResult.Item; var sender = queuedApplicationMessage.Sender; @@ -249,7 +210,6 @@ namespace MQTTnet.Server await _retainedMessagesManager.HandleMessageAsync(sender?.ClientId, applicationMessage).ConfigureAwait(false); } - TestLogger.WriteLine($"pm sessions: {_sessions.Count}"); foreach (var clientSession in _sessions.Values) { clientSession.EnqueueApplicationMessage( @@ -260,18 +220,15 @@ namespace MQTTnet.Server } catch (OperationCanceledException) { - TestLogger.WriteLine($"pm no queue"); } catch (Exception exception) { - TestLogger.WriteLine($"pm no queue {exception}"); _logger.Error(exception, "Unhandled exception while processing next queued application message."); } } private async Task HandleClientAsync(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { - TestLogger.WriteLine($"handle"); var disconnectType = MqttClientDisconnectType.NotClean; string clientId = null; @@ -287,13 +244,11 @@ namespace MQTTnet.Server } clientId = connectPacket.ClientId; - TestLogger.WriteLine($"validating {clientId}"); var connectionValidatorContext = await ValidateConnectionAsync(connectPacket, channelAdapter).ConfigureAwait(false); if (connectionValidatorContext.ReasonCode != MqttConnectReasonCode.Success) { - TestLogger.WriteLine($"{clientId} not good"); ok = false; // Send failure response here without preparing a session. The result for a successful connect // will be sent from the session itself. @@ -303,30 +258,24 @@ namespace MQTTnet.Server return; } - TestLogger.WriteLine($"{clientId} good"); - var connection = await CreateConnectionAsync(connectPacket, connectionValidatorContext, channelAdapter).ConfigureAwait(false); await _eventDispatcher.HandleClientConnectedAsync(clientId).ConfigureAwait(false); disconnectType = await connection.RunAsync().ConfigureAwait(false); - TestLogger.WriteLine($"{clientId} all good"); } catch (OperationCanceledException) { - TestLogger.WriteLine($"no"); } catch (Exception exception) { - TestLogger.WriteLine($"no {exception}"); _logger.Error(exception, exception.Message); } finally { if (ok) { - TestLogger.WriteLine($"finally {clientId}"); if (clientId != null) { _connections.TryRemove(clientId, out _); @@ -349,7 +298,6 @@ namespace MQTTnet.Server private async Task ValidateConnectionAsync(MqttConnectPacket connectPacket, IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("validate"); var context = new MqttConnectionValidatorContext(connectPacket, channelAdapter, new ConcurrentDictionary()); var connectionValidator = _options.ConnectionValidator; @@ -378,7 +326,6 @@ namespace MQTTnet.Server private async Task CreateConnectionAsync(MqttConnectPacket connectPacket, MqttConnectionValidatorContext connectionValidatorContext, IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("cc create"); await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false); try { @@ -407,7 +354,6 @@ namespace MQTTnet.Server if (session == null) { session = new MqttClientSession(connectPacket.ClientId, connectionValidatorContext.SessionItems, _eventDispatcher, _options, _logger); - TestLogger.WriteLine("cc created"); _logger.Verbose("Created a new session for client '{0}'.", connectPacket.ClientId); } @@ -415,8 +361,6 @@ namespace MQTTnet.Server _connections[connection.ClientId] = connection; _sessions[session.ClientId] = session; - TestLogger.WriteLine($"cc connections {_connections.Count}"); - TestLogger.WriteLine($"cc sessions {_sessions.Count}"); return connection; } @@ -428,7 +372,6 @@ namespace MQTTnet.Server private async Task InterceptApplicationMessageAsync(MqttClientConnection senderConnection, MqttApplicationMessage applicationMessage) { - TestLogger.WriteLine("intercept"); var interceptor = _options.ApplicationMessageInterceptor; if (interceptor == null) { @@ -457,7 +400,6 @@ namespace MQTTnet.Server private async Task TryCleanupChannelAsync(IMqttChannelAdapter channelAdapter) { - TestLogger.WriteLine("clean"); try { await channelAdapter.DisconnectAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false); diff --git a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs index 3e39bab..1d675ba 100644 --- a/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs +++ b/Source/MQTTnet/Server/MqttClientSubscriptionsManager.cs @@ -17,7 +17,6 @@ namespace MQTTnet.Server public MqttClientSubscriptionsManager(MqttClientSession clientSession, MqttServerEventDispatcher eventDispatcher, IMqttServerOptions serverOptions) { - TestLogger.WriteLine("sub manager"); _clientSession = clientSession ?? throw new ArgumentNullException(nameof(clientSession)); // TODO: Consider removing the server options here and build a new class "ISubscriptionInterceptor" and just pass it. The instance is generated in the root server class upon start. @@ -27,7 +26,6 @@ namespace MQTTnet.Server public async Task SubscribeAsync(MqttSubscribePacket subscribePacket, MqttConnectPacket connectPacket) { - TestLogger.WriteLine("sub1"); if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket)); if (connectPacket == null) throw new ArgumentNullException(nameof(connectPacket)); @@ -79,7 +77,6 @@ namespace MQTTnet.Server public async Task SubscribeAsync(IEnumerable topicFilters) { - TestLogger.WriteLine("sub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); foreach (var topicFilter in topicFilters) @@ -104,7 +101,6 @@ namespace MQTTnet.Server public async Task UnsubscribeAsync(MqttUnsubscribePacket unsubscribePacket) { - TestLogger.WriteLine("unsub1"); if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket)); var unsubAckPacket = new MqttUnsubAckPacket @@ -137,7 +133,6 @@ namespace MQTTnet.Server public Task UnsubscribeAsync(IEnumerable topicFilters) { - TestLogger.WriteLine("unsub2"); if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); lock (_subscriptions) @@ -153,7 +148,6 @@ namespace MQTTnet.Server public CheckSubscriptionsResult CheckSubscriptions(string topic, MqttQualityOfServiceLevel qosLevel) { - TestLogger.WriteLine("check subs"); var qosLevels = new HashSet(); lock (_subscriptions) @@ -164,7 +158,6 @@ namespace MQTTnet.Server { continue; } - TestLogger.WriteLine("is match"); qosLevels.Add(subscription.Value.QualityOfServiceLevel); } } diff --git a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs index d230eac..f899081 100644 --- a/Source/MQTTnet/Server/MqttServerEventDispatcher.cs +++ b/Source/MQTTnet/Server/MqttServerEventDispatcher.cs @@ -56,7 +56,6 @@ namespace MQTTnet.Server public Task HandleClientSubscribedTopicAsync(string clientId, TopicFilter topicFilter) { - TestLogger.WriteLine("handle sub"); var handler = ClientSubscribedTopicHandler; if (handler == null) { diff --git a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs index b5fa8a3..5cda537 100644 --- a/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs +++ b/Tests/MQTTnet.Core.Tests/Mockups/TestEnvironment.cs @@ -114,7 +114,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreServerLogErrors && _serverErrors.Count > 0) { - //throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); + throw new Exception($"Server had {_serverErrors.Count} errors (${string.Join(Environment.NewLine, _serverErrors)})."); } } @@ -122,7 +122,7 @@ namespace MQTTnet.Tests.Mockups { if (!IgnoreClientLogErrors && _clientErrors.Count > 0) { - //throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); + throw new Exception($"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})."); } } } diff --git a/Tests/MQTTnet.Core.Tests/Server_Tests.cs b/Tests/MQTTnet.Core.Tests/Server_Tests.cs index de3e871..c3d6df1 100644 --- a/Tests/MQTTnet.Core.Tests/Server_Tests.cs +++ b/Tests/MQTTnet.Core.Tests/Server_Tests.cs @@ -936,6 +936,8 @@ namespace MQTTnet.Tests { using (var testEnvironment = new TestEnvironment()) { + testEnvironment.IgnoreClientLogErrors = true; + _connected = new Dictionary(); var options = new MqttServerOptionsBuilder(); options.WithConnectionValidator(e => ConnectionValidationHandler(e)); @@ -943,8 +945,6 @@ namespace MQTTnet.Tests var events = new List(); - var connected = true; - server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(_ => { lock (events) @@ -972,15 +972,6 @@ namespace MQTTnet.Tests lock (events) { events.Add("x"); - connected = false; - } - }); - - c1.UseConnectedHandler(_ => - { - lock (events) - { - connected = true; } }); @@ -1002,7 +993,6 @@ namespace MQTTnet.Tests await Task.Delay(500); - var flow = string.Join(string.Empty, events); Assert.AreEqual("cr", flow); @@ -1018,16 +1008,6 @@ namespace MQTTnet.Tests await Task.Delay(500); - /*if (!connected) - { - c1.ReconnectAsync().Wait(); - } - - if (!c1.IsConnected) - { - c1.ReconnectAsync().Wait(); - }*/ - flow = string.Join(string.Empty, events); Assert.AreEqual("cr", flow); @@ -1037,7 +1017,6 @@ namespace MQTTnet.Tests flow = string.Join(string.Empty, events); Assert.AreEqual("crr", flow); - } }