Browse Source

Refactor event handler namings.

release/3.x.x
Christian Kratky 5 years ago
parent
commit
03bfc5f00b
45 changed files with 488 additions and 95 deletions
  1. +12
    -1
      README.md
  2. +31
    -0
      Source/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedHandlerDelegate.cs
  3. +31
    -0
      Source/MQTTnet.Extensions.ManagedClient/ApplicationMessageSkippedHandlerDelegate.cs
  4. +31
    -0
      Source/MQTTnet.Extensions.ManagedClient/ConnectingFailedHandlerDelegate.cs
  5. +9
    -0
      Source/MQTTnet.Extensions.ManagedClient/IApplicationMessageProcessedHandler.cs
  6. +9
    -0
      Source/MQTTnet.Extensions.ManagedClient/IApplicationMessageSkippedHandler.cs
  7. +9
    -0
      Source/MQTTnet.Extensions.ManagedClient/IConnectingFailedHandler.cs
  8. +5
    -5
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
  9. +0
    -1
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs
  10. +9
    -0
      Source/MQTTnet.Extensions.ManagedClient/ISynchronizingSubscriptionsFailedHandler.cs
  11. +48
    -20
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  12. +2
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs
  13. +2
    -2
      Source/MQTTnet.Extensions.ManagedClient/ManagedProcessFailedEventArgs.cs
  14. +3
    -4
      Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs
  15. +31
    -0
      Source/MQTTnet.Extensions.ManagedClient/SynchronizingSubscriptionsFailedHandlerDelegate.cs
  16. +15
    -4
      Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
  17. +31
    -0
      Source/MQTTnet.Extensions.Rpc/RpcAwareApplicationMessageReceivedHandler.cs
  18. +2
    -2
      Source/MQTTnet/Client/MqttClient.cs
  19. +3
    -3
      Source/MQTTnet/Client/MqttClientExtensions.cs
  20. +0
    -9
      Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs
  21. +9
    -0
      Source/MQTTnet/Client/Receiving/IMqttApplicationMessageReceivedHandler.cs
  22. +4
    -4
      Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs
  23. +1
    -1
      Source/MQTTnet/IApplicationMessageReceiver.cs
  24. +11
    -0
      Source/MQTTnet/IMqttFactory.cs
  25. +8
    -9
      Source/MQTTnet/MqttFactory.cs
  26. +3
    -4
      Source/MQTTnet/Server/IMqttServer.cs
  27. +10
    -0
      Source/MQTTnet/Server/IMqttServerStartedHandler.cs
  28. +10
    -0
      Source/MQTTnet/Server/IMqttServerStoppedHandler.cs
  29. +1
    -1
      Source/MQTTnet/Server/MqttClientConnection.cs
  30. +2
    -5
      Source/MQTTnet/Server/MqttClientSessionsManager.cs
  31. +16
    -5
      Source/MQTTnet/Server/MqttServer.cs
  32. +2
    -2
      Source/MQTTnet/Server/MqttServerEventDispatcher.cs
  33. +31
    -0
      Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs
  34. +31
    -0
      Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs
  35. +1
    -0
      Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj
  36. +1
    -1
      Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs
  37. +1
    -1
      Tests/MQTTnet.Core.Tests/MqttFactoryTests.cs
  38. +55
    -0
      Tests/MQTTnet.Core.Tests/RPC_Tests.cs
  39. +1
    -1
      Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs
  40. +1
    -1
      Tests/MQTTnet.Core.Tests/Server_Tests.cs
  41. +2
    -3
      Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs
  42. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/ClientTest.cs
  43. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs
  44. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs
  45. +1
    -1
      Tests/MQTTnet.TestApp.NetCore/ServerTest.cs

+ 12
- 1
README.md View File

@@ -26,7 +26,8 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Uniform API across all supported versions of the MQTT protocol
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (~130 tests)
* Unit tested (~150 tests)
* No external dependencies

\* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_.

@@ -51,6 +52,16 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Validate subscriptions and deny subscribing of certain topics depending on requesting clients
* Connect clients with different protocol versions at the same time.

### MQTTnet Server

_MQTTnet Server_ is a reference implementation of a MQTT server using this library. It has the following features.
* Running portable (no installation required)
* Python scripting support for manipulating messages, validation of clients etc.
* Runs und Windows, Linux, macOS, Raspberry Pi
* Supports WebSocket and TCP (with and without TLS) connections
* Provides a HTTP based API (including Swagger endpoint)
* Extensive configuration

## Supported frameworks

* .NET Standard 1.3+


+ 31
- 0
Source/MQTTnet.Extensions.ManagedClient/ApplicationMessageProcessedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public class ApplicationMessageProcessedHandlerDelegate : IApplicationMessageProcessedHandler
{
private readonly Func<ApplicationMessageProcessedEventArgs, Task> _handler;

public ApplicationMessageProcessedHandlerDelegate(Action<ApplicationMessageProcessedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public ApplicationMessageProcessedHandlerDelegate(Func<ApplicationMessageProcessedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleApplicationMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 31
- 0
Source/MQTTnet.Extensions.ManagedClient/ApplicationMessageSkippedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public class ApplicationMessageSkippedHandlerDelegate : IApplicationMessageSkippedHandler
{
private readonly Func<ApplicationMessageSkippedEventArgs, Task> _handler;

public ApplicationMessageSkippedHandlerDelegate(Action<ApplicationMessageSkippedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public ApplicationMessageSkippedHandlerDelegate(Func<ApplicationMessageSkippedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleApplicationMessageSkippedAsync(ApplicationMessageSkippedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 31
- 0
Source/MQTTnet.Extensions.ManagedClient/ConnectingFailedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public class ConnectingFailedHandlerDelegate : IConnectingFailedHandler
{
private readonly Func<ManagedProcessFailedEventArgs, Task> _handler;

public ConnectingFailedHandlerDelegate(Action<ManagedProcessFailedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public ConnectingFailedHandlerDelegate(Func<ManagedProcessFailedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleConnectingFailedAsync(ManagedProcessFailedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 9
- 0
Source/MQTTnet.Extensions.ManagedClient/IApplicationMessageProcessedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public interface IApplicationMessageProcessedHandler
{
Task HandleApplicationMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArgs);
}
}

+ 9
- 0
Source/MQTTnet.Extensions.ManagedClient/IApplicationMessageSkippedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public interface IApplicationMessageSkippedHandler
{
Task HandleApplicationMessageSkippedAsync(ApplicationMessageSkippedEventArgs eventArgs);
}
}

+ 9
- 0
Source/MQTTnet.Extensions.ManagedClient/IConnectingFailedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public interface IConnectingFailedHandler
{
Task HandleConnectingFailedAsync(ManagedProcessFailedEventArgs eventArgs);
}
}

+ 5
- 5
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs View File

@@ -15,13 +15,13 @@ namespace MQTTnet.Extensions.ManagedClient

IMqttClientConnectedHandler ConnectedHandler { get; set; }
IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;

event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }
IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }

IConnectingFailedHandler ConnectingFailedHandler { get; set; }
ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }
Task StartAsync(IManagedMqttClientOptions options);
Task StopAsync();



+ 0
- 1
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientOptions.cs View File

@@ -1,5 +1,4 @@
using System;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Server;



+ 9
- 0
Source/MQTTnet.Extensions.ManagedClient/ISynchronizingSubscriptionsFailedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public interface ISynchronizingSubscriptionsFailedHandler
{
Task HandleSynchronizingSubscriptionsFailedAsync(ManagedProcessFailedEventArgs eventArgs);
}
}

+ 48
- 20
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -57,17 +57,19 @@ namespace MQTTnet.Extensions.ManagedClient
set => _mqttClient.DisconnectedHandler = value;
}

public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => _mqttClient.ApplicationMessageReceivedHandler;
set => _mqttClient.ApplicationMessageReceivedHandler = value;
}

public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;
public IApplicationMessageProcessedHandler ApplicationMessageProcessedHandler { get; set; }

public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
public IApplicationMessageSkippedHandler ApplicationMessageSkippedHandler { get; set; }

public IConnectingFailedHandler ConnectingFailedHandler { get; set; }

public ISynchronizingSubscriptionsFailedHandler SynchronizingSubscriptionsFailedHandler { get; set; }

public async Task StartAsync(IManagedMqttClientOptions options)
{
@@ -126,28 +128,45 @@ namespace MQTTnet.Extensions.ManagedClient
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

ManagedMqttApplicationMessage removedMessage = null;
lock (_messageQueue)
ApplicationMessageSkippedEventArgs applicationMessageSkippedEventArgs = null;

try
{
if (_messageQueue.Count >= Options.MaxPendingMessages)
lock (_messageQueue)
{
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
if (_messageQueue.Count >= Options.MaxPendingMessages)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
return;
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(applicationMessage);
return;
}

if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
applicationMessageSkippedEventArgs = new ApplicationMessageSkippedEventArgs(removedMessage);
}
}

if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
_messageQueue.Enqueue(applicationMessage);
}
}
finally
{
if (applicationMessageSkippedEventArgs != null)
{
var applicationMessageSkippedHandler = ApplicationMessageSkippedHandler;
if (applicationMessageSkippedHandler != null)
{
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(removedMessage));
await applicationMessageSkippedHandler.HandleApplicationMessageSkippedAsync(applicationMessageSkippedEventArgs).ConfigureAwait(false);
}
}

_messageQueue.Enqueue(applicationMessage);
}
if (_storageManager != null)
{
if (removedMessage != null)
@@ -352,7 +371,7 @@ namespace MQTTnet.Extensions.ManagedClient
}
finally
{
ApplicationMessageProcessed?.Invoke(this, new ApplicationMessageProcessedEventArgs(message, transmitException));
ApplicationMessageProcessedHandler?.HandleApplicationMessageProcessedAsync(new ApplicationMessageProcessedEventArgs(message, transmitException)).GetAwaiter().GetResult();
}
}

@@ -395,7 +414,11 @@ namespace MQTTnet.Extensions.ManagedClient
_logger.Warning(exception, "Synchronizing subscriptions failed.");
_subscriptionsNotPushed = true;

SynchronizingSubscriptionsFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
var synchronizingSubscriptionsFailedHandler = SynchronizingSubscriptionsFailedHandler;
if (SynchronizingSubscriptionsFailedHandler != null)
{
await synchronizingSubscriptionsFailedHandler.HandleSynchronizingSubscriptionsFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
}
}
}

@@ -413,7 +436,12 @@ namespace MQTTnet.Extensions.ManagedClient
}
catch (Exception exception)
{
ConnectingFailed?.Invoke(this, new MqttManagedProcessFailedEventArgs(exception));
var connectingFailedHandler = ConnectingFailedHandler;
if (connectingFailedHandler != null)
{
await connectingFailedHandler.HandleConnectingFailedAsync(new ManagedProcessFailedEventArgs(exception)).ConfigureAwait(false);
}

return ReconnectionResult.NotConnected;
}
}


+ 2
- 2
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs View File

@@ -68,7 +68,7 @@ namespace MQTTnet.Extensions.ManagedClient
return client;
}

client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler);

return client;
}
@@ -81,7 +81,7 @@ namespace MQTTnet.Extensions.ManagedClient
return client;
}

client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler);

return client;
}


Source/MQTTnet.Extensions.ManagedClient/MqttManagedProcessFailedEventArgs.cs → Source/MQTTnet.Extensions.ManagedClient/ManagedProcessFailedEventArgs.cs View File

@@ -2,9 +2,9 @@

namespace MQTTnet.Extensions.ManagedClient
{
public class MqttManagedProcessFailedEventArgs : EventArgs
public class ManagedProcessFailedEventArgs : EventArgs
{
public MqttManagedProcessFailedEventArgs(Exception exception)
public ManagedProcessFailedEventArgs(Exception exception)
{
Exception = exception;
}

+ 3
- 4
Source/MQTTnet.Extensions.ManagedClient/MqttFactoryExtensions.cs View File

@@ -1,19 +1,18 @@
using System;
using MQTTnet.Client;
using MQTTnet.Diagnostics;

namespace MQTTnet.Extensions.ManagedClient
{
public static class MqttFactoryExtensions
{
public static IManagedMqttClient CreateManagedMqttClient(this IMqttClientFactory factory)
public static IManagedMqttClient CreateManagedMqttClient(this IMqttFactory factory)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));

return new ManagedMqttClient(factory.CreateMqttClient(), new MqttNetLogger().CreateChildLogger());
return new ManagedMqttClient(factory.CreateMqttClient(), factory.DefaultLogger.CreateChildLogger());
}

public static IManagedMqttClient CreateManagedMqttClient(this IMqttClientFactory factory, IMqttNetLogger logger)
public static IManagedMqttClient CreateManagedMqttClient(this IMqttFactory factory, IMqttNetLogger logger)
{
if (factory == null) throw new ArgumentNullException(nameof(factory));
if (logger == null) throw new ArgumentNullException(nameof(logger));


+ 31
- 0
Source/MQTTnet.Extensions.ManagedClient/SynchronizingSubscriptionsFailedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Extensions.ManagedClient
{
public class SynchronizingSubscriptionsFailedHandlerDelegate : ISynchronizingSubscriptionsFailedHandler
{
private readonly Func<ManagedProcessFailedEventArgs, Task> _handler;

public SynchronizingSubscriptionsFailedHandlerDelegate(Action<ManagedProcessFailedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public SynchronizingSubscriptionsFailedHandlerDelegate(Func<ManagedProcessFailedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleSynchronizingSubscriptionsFailedAsync(ManagedProcessFailedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 15
- 4
Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs View File

@@ -4,7 +4,6 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;

@@ -14,12 +13,17 @@ namespace MQTTnet.Extensions.Rpc
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> _waitingCalls = new ConcurrentDictionary<string, TaskCompletionSource<byte[]>>();
private readonly IMqttClient _mqttClient;
private readonly RpcAwareApplicationMessageReceivedHandler _applicationMessageReceivedHandler;

public MqttRpcClient(IMqttClient mqttClient)
{
_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));

_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(HandleReceivedApplicationMessageAsync);
_applicationMessageReceivedHandler = new RpcAwareApplicationMessageReceivedHandler(
mqttClient.ApplicationMessageReceivedHandler,
HandleApplicationMessageReceivedAsync);

_mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler;
}

public Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
@@ -46,6 +50,11 @@ namespace MQTTnet.Extensions.Rpc
throw new ArgumentException("The method name cannot contain /, + or #.");
}

if (!(_mqttClient.ApplicationMessageReceivedHandler is RpcAwareApplicationMessageReceivedHandler))
{
throw new InvalidOperationException("The application message received handler was modified.");
}

var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{methodName}";
var responseTopic = requestTopic + "/response";

@@ -103,7 +112,7 @@ namespace MQTTnet.Extensions.Rpc
}
}

private Task HandleReceivedApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
private Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
if (!_waitingCalls.TryRemove(eventArgs.ApplicationMessage.Topic, out var tcs))
{
@@ -122,9 +131,11 @@ namespace MQTTnet.Extensions.Rpc

public void Dispose()
{
_mqttClient.ApplicationMessageReceivedHandler = _applicationMessageReceivedHandler.OriginalHandler;

foreach (var tcs in _waitingCalls)
{
tcs.Value.SetCanceled();
tcs.Value.TrySetCanceled();
}

_waitingCalls.Clear();


+ 31
- 0
Source/MQTTnet.Extensions.Rpc/RpcAwareApplicationMessageReceivedHandler.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;

namespace MQTTnet.Extensions.Rpc
{
public class RpcAwareApplicationMessageReceivedHandler : IMqttApplicationMessageReceivedHandler
{
private readonly Func<MqttApplicationMessageReceivedEventArgs, Task> _handleReceivedApplicationMessageAsync;

public RpcAwareApplicationMessageReceivedHandler(
IMqttApplicationMessageReceivedHandler originalHandler,
Func<MqttApplicationMessageReceivedEventArgs, Task> handleReceivedApplicationMessageAsync)
{
OriginalHandler = originalHandler;
_handleReceivedApplicationMessageAsync = handleReceivedApplicationMessageAsync ?? throw new ArgumentNullException(nameof(handleReceivedApplicationMessageAsync));
}

public IMqttApplicationMessageReceivedHandler OriginalHandler { get; }

public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
if (OriginalHandler != null)
{
await OriginalHandler.HandleApplicationMessageReceivedAsync(eventArgs).ConfigureAwait(false);
}

await _handleReceivedApplicationMessageAsync(eventArgs).ConfigureAwait(false);
}
}
}

+ 2
- 2
Source/MQTTnet/Client/MqttClient.cs View File

@@ -48,7 +48,7 @@ namespace MQTTnet.Client

public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }

public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }

public bool IsConnected { get; private set; }

@@ -579,7 +579,7 @@ namespace MQTTnet.Client
var handler = ApplicationMessageReceivedHandler;
if (handler != null)
{
return handler.HandleApplicationMessageAsync(
return handler.HandleApplicationMessageReceivedAsync(
new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));
}



+ 3
- 3
Source/MQTTnet/Client/MqttClientExtensions.cs View File

@@ -71,7 +71,7 @@ namespace MQTTnet.Client
return client;
}

client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler);

return client;
}
@@ -84,12 +84,12 @@ namespace MQTTnet.Client
return client;
}

client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(handler);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(handler);

return client;
}

public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageHandler handler)
public static IMqttClient UseApplicationMessageReceivedHandler(this IMqttClient client, IMqttApplicationMessageReceivedHandler handler)
{
client.ApplicationMessageReceivedHandler = handler;



+ 0
- 9
Source/MQTTnet/Client/Receiving/IMqttApplicationMessageHandler.cs View File

@@ -1,9 +0,0 @@
using System.Threading.Tasks;

namespace MQTTnet.Client.Receiving
{
public interface IMqttApplicationMessageHandler
{
Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs eventArgs);
}
}

+ 9
- 0
Source/MQTTnet/Client/Receiving/IMqttApplicationMessageReceivedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Client.Receiving
{
public interface IMqttApplicationMessageReceivedHandler
{
Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs);
}
}

Source/MQTTnet/Client/Receiving/MqttApplicationMessageHandlerDelegate.cs → Source/MQTTnet/Client/Receiving/MqttApplicationMessageReceivedHandlerDelegate.cs View File

@@ -3,11 +3,11 @@ using System.Threading.Tasks;

namespace MQTTnet.Client.Receiving
{
public class MqttApplicationMessageHandlerDelegate : IMqttApplicationMessageHandler
public class MqttApplicationMessageReceivedHandlerDelegate : IMqttApplicationMessageReceivedHandler
{
private readonly Func<MqttApplicationMessageReceivedEventArgs, Task> _handler;

public MqttApplicationMessageHandlerDelegate(Action<MqttApplicationMessageReceivedEventArgs> handler)
public MqttApplicationMessageReceivedHandlerDelegate(Action<MqttApplicationMessageReceivedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

@@ -18,12 +18,12 @@ namespace MQTTnet.Client.Receiving
};
}

public MqttApplicationMessageHandlerDelegate(Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
public MqttApplicationMessageReceivedHandlerDelegate(Func<MqttApplicationMessageReceivedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleApplicationMessageAsync(MqttApplicationMessageReceivedEventArgs context)
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs context)
{
return _handler(context);
}

+ 1
- 1
Source/MQTTnet/IApplicationMessageReceiver.cs View File

@@ -4,6 +4,6 @@ namespace MQTTnet
{
public interface IApplicationMessageReceiver
{
IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }
IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }
}
}

+ 11
- 0
Source/MQTTnet/IMqttFactory.cs View File

@@ -0,0 +1,11 @@
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Server;

namespace MQTTnet
{
public interface IMqttFactory : IMqttClientFactory, IMqttServerFactory
{
IMqttNetLogger DefaultLogger { get; }
}
}

+ 8
- 9
Source/MQTTnet/MqttFactory.cs View File

@@ -8,22 +8,22 @@ using MQTTnet.Server;

namespace MQTTnet
{
public class MqttFactory : IMqttClientFactory, IMqttServerFactory
public class MqttFactory : IMqttFactory
{
private readonly IMqttNetLogger _logger;

public MqttFactory() : this(new MqttNetLogger())
{
}

public MqttFactory(IMqttNetLogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
DefaultLogger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public IMqttNetLogger DefaultLogger { get; }

public IMqttClient CreateMqttClient()
{
return CreateMqttClient(new MqttNetLogger());
return CreateMqttClient(DefaultLogger);
}

public IMqttClient CreateMqttClient(IMqttNetLogger logger)
@@ -37,7 +37,7 @@ namespace MQTTnet
{
if (adapterFactory == null) throw new ArgumentNullException(nameof(adapterFactory));

return new MqttClient(adapterFactory, new MqttNetLogger());
return new MqttClient(adapterFactory, DefaultLogger);
}

public IMqttClient CreateMqttClient(IMqttNetLogger logger, IMqttClientAdapterFactory adapterFactory)
@@ -50,8 +50,7 @@ namespace MQTTnet

public IMqttServer CreateMqttServer()
{
var logger = new MqttNetLogger();
return CreateMqttServer(logger);
return CreateMqttServer(DefaultLogger);
}

public IMqttServer CreateMqttServer(IMqttNetLogger logger)
@@ -73,7 +72,7 @@ namespace MQTTnet
{
if (adapters == null) throw new ArgumentNullException(nameof(adapters));
return new MqttServer(adapters, _logger.CreateChildLogger());
return new MqttServer(adapters, DefaultLogger.CreateChildLogger());
}
}
}

+ 3
- 4
Source/MQTTnet/Server/IMqttServer.cs View File

@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Server.Status;

@@ -7,8 +6,8 @@ namespace MQTTnet.Server
{
public interface IMqttServer : IApplicationMessageReceiver, IApplicationMessagePublisher
{
event EventHandler Started;
event EventHandler Stopped;
IMqttServerStartedHandler StartedHandler { get; set; }
IMqttServerStoppedHandler StoppedHandler { get; set; }

IMqttServerClientConnectedHandler ClientConnectedHandler { get; set; }
IMqttServerClientDisconnectedHandler ClientDisconnectedHandler { get; set; }


+ 10
- 0
Source/MQTTnet/Server/IMqttServerStartedHandler.cs View File

@@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerStartedHandler
{
Task HandleServerStartedAsync(EventArgs eventArgs);
}
}

+ 10
- 0
Source/MQTTnet/Server/IMqttServerStoppedHandler.cs View File

@@ -0,0 +1,10 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public interface IMqttServerStoppedHandler
{
Task HandleServerStoppedAsync(EventArgs eventArgs);
}
}

+ 1
- 1
Source/MQTTnet/Server/MqttClientConnection.cs View File

@@ -369,7 +369,7 @@ namespace MQTTnet.Server
//// }
////});

await Task.FromResult(0);
//await Task.FromResult(0);
}

private async Task SendPendingPacketsAsync()


+ 2
- 5
Source/MQTTnet/Server/MqttClientSessionsManager.cs View File

@@ -15,6 +15,7 @@ namespace MQTTnet.Server
{
private readonly BlockingCollection<MqttEnqueuedApplicationMessage> _messageQueue = new BlockingCollection<MqttEnqueuedApplicationMessage>();

private readonly SemaphoreSlim _createConnectionGate = new SemaphoreSlim(1, 1);
private readonly ConcurrentDictionary<string, MqttClientConnection> _connections = new ConcurrentDictionary<string, MqttClientConnection>();
private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new ConcurrentDictionary<string, MqttClientSession>();
@@ -49,8 +50,6 @@ namespace MQTTnet.Server

public async Task StopAsync()
{
//using (await _sessionsLock.WaitAsync(CancellationToken.None).ConfigureAwait(false))

foreach (var connection in _connections.Values)
{
await connection.StopAsync().ConfigureAwait(false);
@@ -133,7 +132,7 @@ namespace MQTTnet.Server
{
if (_connections.TryGetValue(clientId, out var connection))
{
await connection.StopAsync();
await connection.StopAsync().ConfigureAwait(false);
}

if (_sessions.TryRemove(clientId, out var session))
@@ -313,8 +312,6 @@ namespace MQTTnet.Server
return context;
}

private readonly SemaphoreSlim _createConnectionGate = new SemaphoreSlim(1, 1);

private async Task<MqttClientConnection> CreateConnectionAsync(IMqttChannelAdapter channelAdapter, MqttConnectPacket connectPacket)
{
await _createConnectionGate.WaitAsync(_cancellationToken).ConfigureAwait(false);


+ 16
- 5
Source/MQTTnet/Server/MqttServer.cs View File

@@ -30,8 +30,9 @@ namespace MQTTnet.Server
_logger = logger.CreateChildLogger(nameof(MqttServer));
}

public event EventHandler Started;
public event EventHandler Stopped;
public IMqttServerStartedHandler StartedHandler { get; set; }

public IMqttServerStoppedHandler StoppedHandler { get; set; }

public IMqttServerClientConnectedHandler ClientConnectedHandler
{
@@ -57,7 +58,7 @@ namespace MQTTnet.Server
set => _eventDispatcher.ClientUnsubscribedTopicHandler = value;
}
public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler
{
get => _eventDispatcher.ApplicationMessageReceivedHandler;
set => _eventDispatcher.ApplicationMessageReceivedHandler = value;
@@ -128,7 +129,12 @@ namespace MQTTnet.Server
}

_logger.Info("Started.");
Started?.Invoke(this, EventArgs.Empty);

var startedHandler = StartedHandler;
if (startedHandler != null)
{
await startedHandler.HandleServerStartedAsync(EventArgs.Empty).ConfigureAwait(false);
}
}

public async Task StopAsync()
@@ -151,7 +157,6 @@ namespace MQTTnet.Server
}

_logger.Info("Stopped.");
Stopped?.Invoke(this, EventArgs.Empty);
}
finally
{
@@ -163,6 +168,12 @@ namespace MQTTnet.Server
_clientSessionsManager?.Dispose();
_clientSessionsManager = null;
}

var stoppedHandler = StoppedHandler;
if (stoppedHandler != null)
{
await stoppedHandler.HandleServerStoppedAsync(EventArgs.Empty).ConfigureAwait(false);
}
}

public Task ClearRetainedMessagesAsync()


+ 2
- 2
Source/MQTTnet/Server/MqttServerEventDispatcher.cs View File

@@ -13,7 +13,7 @@ namespace MQTTnet.Server

public IMqttServerClientUnsubscribedTopicHandler ClientUnsubscribedTopicHandler { get; set; }

public IMqttApplicationMessageHandler ApplicationMessageReceivedHandler { get; set; }
public IMqttApplicationMessageReceivedHandler ApplicationMessageReceivedHandler { get; set; }

public Task HandleClientConnectedAsync(string clientId)
{
@@ -67,7 +67,7 @@ namespace MQTTnet.Server
return Task.FromResult(0);
}

return handler.HandleApplicationMessageAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
return handler.HandleApplicationMessageReceivedAsync(new MqttApplicationMessageReceivedEventArgs(senderClientId, applicationMessage));
}
}
}

+ 31
- 0
Source/MQTTnet/Server/MqttServerStartedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerStartedHandlerDelegate : IMqttServerStartedHandler
{
private readonly Func<EventArgs, Task> _handler;

public MqttServerStartedHandlerDelegate(Action<EventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public MqttServerStartedHandlerDelegate(Func<EventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleServerStartedAsync(EventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 31
- 0
Source/MQTTnet/Server/MqttServerStoppedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Server
{
public class MqttServerStoppedHandlerDelegate : IMqttServerStoppedHandler
{
private readonly Func<EventArgs, Task> _handler;

public MqttServerStoppedHandlerDelegate(Action<EventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = eventArgs =>
{
handler(eventArgs);
return Task.FromResult(0);
};
}

public MqttServerStoppedHandlerDelegate(Func<EventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleServerStoppedAsync(EventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 1
- 0
Tests/MQTTnet.Core.Tests/MQTTnet.Tests.csproj View File

@@ -14,6 +14,7 @@

<ItemGroup>
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.ManagedClient\MQTTnet.Extensions.ManagedClient.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" />
</ItemGroup>


+ 1
- 1
Tests/MQTTnet.Core.Tests/MQTTv5/Client_Tests.cs View File

@@ -214,7 +214,7 @@ namespace MQTTnet.Tests.MQTTv5
var receivedMessages = new List<MqttApplicationMessageReceivedEventArgs>();

await client1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").WithClientId("client1").WithProtocolVersion(MqttProtocolVersion.V500).Build());
client1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
client1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
{
lock (receivedMessages)
{


+ 1
- 1
Tests/MQTTnet.Core.Tests/MqttFactoryTests.cs View File

@@ -59,7 +59,7 @@ namespace MQTTnet.Tests

//wait at least connect timeout or we have some log messages
var tcs = new TaskCompletionSource<object>();
managedClient.ConnectingFailed += (s, e) => tcs.TrySetResult(null);
managedClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(e => tcs.TrySetResult(null));
await Task.WhenAny(Task.Delay(managedClient.Options.ClientOptions.CommunicationTimeout), tcs.Task);
}
finally


+ 55
- 0
Tests/MQTTnet.Core.Tests/RPC_Tests.cs View File

@@ -0,0 +1,55 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Tests.Mockups;
using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using MQTTnet.Extensions.Rpc;
using MQTTnet.Protocol;

namespace MQTTnet.Tests
{
[TestClass]
public class RPC_Tests
{
[TestMethod]
public async Task Execute_Success()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync();
var responseSender = await testEnvironment.ConnectClientAsync();
await responseSender.SubscribeAsync("MQTTnet.RPC/+/ping");

responseSender.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(async e =>
{
await responseSender.PublishAsync(e.ApplicationMessage.Topic + "/response", "pong");
});

var requestSender = await testEnvironment.ConnectClientAsync();

var rpcClient = new MqttRpcClient(requestSender);
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}

[TestMethod]
[ExpectedException(typeof(MqttCommunicationTimedOutException))]
public async Task Execute_Timeout()
{
using (var testEnvironment = new TestEnvironment())
{
await testEnvironment.StartServerAsync();
var requestSender = await testEnvironment.ConnectClientAsync();

var rpcClient = new MqttRpcClient(requestSender);
await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(2), "ping", "", MqttQualityOfServiceLevel.AtMostOnce);
}
}
}
}

+ 1
- 1
Tests/MQTTnet.Core.Tests/RoundtripTimeTests.cs View File

@@ -30,7 +30,7 @@ namespace MQTTnet.Tests

TaskCompletionSource<string> response = null;

receiverClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(args =>
receiverClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(args =>
{
response?.SetResult(args.ApplicationMessage.ConvertPayloadToString());
});


+ 1
- 1
Tests/MQTTnet.Core.Tests/Server_Tests.cs View File

@@ -68,7 +68,7 @@ namespace MQTTnet.Tests
var clientOptions = new MqttClientOptionsBuilder().WithWillMessage(willMessage);

var c1 = await testEnvironment.ConnectClientAsync();
c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
c1.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(c => Interlocked.Increment(ref receivedMessagesCount));
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic("#").Build());

var c2 = await testEnvironment.ConnectClientAsync(clientOptions);


+ 2
- 3
Tests/MQTTnet.TestApp.AspNetCore2/Startup.cs View File

@@ -5,7 +5,6 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.FileProviders;
using MQTTnet;
using MQTTnet.AspNetCore;
using MQTTnet.Server;

@@ -36,7 +35,7 @@ namespace MQTTnet.TestApp.AspNetCore2
//app.UseMqttEndpoint();
app.UseMqttServer(server =>
{
server.Started += async (sender, args) =>
server.StartedHandler = new MqttServerStartedHandlerDelegate(async args =>
{
var msg = new MqttApplicationMessageBuilder()
.WithPayload("Mqtt is awesome")
@@ -58,7 +57,7 @@ namespace MQTTnet.TestApp.AspNetCore2
await Task.Delay(TimeSpan.FromSeconds(2));
}
}
};
});
});

app.Use((context, next) =>


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/ClientTest.cs View File

@@ -28,7 +28,7 @@ namespace MQTTnet.TestApp.NetCore
}
};

client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/ManagedClientTest.cs View File

@@ -35,7 +35,7 @@ namespace MQTTnet.TestApp.NetCore
try
{
var managedClient = new MqttFactory().CreateManagedMqttClient();
managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
managedClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
{
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic);
});


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/PublicBrokerTest.cs View File

@@ -81,7 +81,7 @@ namespace MQTTnet.TestApp.NetCore
var topic = Guid.NewGuid().ToString();

MqttApplicationMessage receivedMessage = null;
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e => receivedMessage = e.ApplicationMessage);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e => receivedMessage = e.ApplicationMessage);

await client.ConnectAsync(options);
await client.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);


+ 1
- 1
Tests/MQTTnet.TestApp.NetCore/ServerTest.cs View File

@@ -79,7 +79,7 @@ namespace MQTTnet.TestApp.NetCore

var mqttServer = new MqttFactory().CreateMqttServer();

mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageHandlerDelegate(e =>
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
{
MqttNetConsoleLogger.PrintToConsole(
$"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0])}'",


Loading…
Cancel
Save