Browse Source

Merge pull request #551 from dpsenner/feature/add-async-handlers-for-connected-and-disconnected

Add async handler interfaces for connected and disconnected to allow async code on connect
release/3.x.x
Christian 5 years ago
committed by GitHub
parent
commit
087c622122
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 194 additions and 11 deletions
  1. +6
    -1
      Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs
  2. +16
    -7
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
  3. +53
    -0
      Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs
  4. +9
    -0
      Source/MQTTnet/Client/Connecting/IMqttClientConnectedHandler.cs
  5. +31
    -0
      Source/MQTTnet/Client/Connecting/MqttClientConnectedHandlerDelegate.cs
  6. +9
    -0
      Source/MQTTnet/Client/Disconnecting/IMqttClientDisconnectedHandler.cs
  7. +31
    -0
      Source/MQTTnet/Client/Disconnecting/MqttClientDisconnectedHandlerDelegate.cs
  8. +5
    -0
      Source/MQTTnet/Client/IMqttClient.cs
  9. +8
    -1
      Source/MQTTnet/Client/MqttClient.cs
  10. +26
    -2
      Source/MQTTnet/Client/MqttClientExtensions.cs

+ 6
- 1
Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClient.cs View File

@@ -13,7 +13,12 @@ namespace MQTTnet.Extensions.ManagedClient
int PendingApplicationMessagesCount { get; }
IManagedMqttClientOptions Options { get; }

IMqttClientConnectedHandler ConnectedHandler { get; set; }
[Obsolete("Use ConnectedHandler instead.")]
event EventHandler<MqttClientConnectedEventArgs> Connected;

IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
[Obsolete("Use DisconnectedHandler instead.")]
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
@@ -21,7 +26,7 @@ namespace MQTTnet.Extensions.ManagedClient

event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
Task StartAsync(IManagedMqttClientOptions options);
Task StopAsync();



+ 16
- 7
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs View File

@@ -4,6 +4,8 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
@@ -38,9 +40,9 @@ namespace MQTTnet.Extensions.ManagedClient

_mqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));

_mqttClient.Connected += OnConnected;
_mqttClient.Disconnected += OnDisconnected;
_mqttClient.ApplicationMessageReceived += OnApplicationMessageReceived;
_mqttClient.UseConnectedHandler(OnConnected);
_mqttClient.UseDisconnectedHandler(OnDisconnected);
_mqttClient.UseReceivedApplicationMessageHandler(OnApplicationMessageReceived);

_logger = logger.CreateChildLogger(nameof(ManagedMqttClient));
}
@@ -50,9 +52,13 @@ namespace MQTTnet.Extensions.ManagedClient
public int PendingApplicationMessagesCount => _messageQueue.Count;
public IManagedMqttClientOptions Options { get; private set; }

public IMqttClientConnectedHandler ConnectedHandler { get; set; }
public event EventHandler<MqttClientConnectedEventArgs> Connected;

public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;


public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler
{
get => _mqttClient.ReceivedApplicationMessageHandler;
@@ -415,19 +421,22 @@ namespace MQTTnet.Extensions.ManagedClient
}
}

private void OnApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs eventArgs)
private Task OnApplicationMessageReceived(MqttApplicationMessageHandlerContext context)
{
ApplicationMessageReceived?.Invoke(this, eventArgs);
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(context.SenderClientId, context.ApplicationMessage));
return Task.FromResult(0);
}

private void OnDisconnected(object sender, MqttClientDisconnectedEventArgs eventArgs)
private Task OnDisconnected(MqttClientDisconnectedEventArgs eventArgs)
{
Disconnected?.Invoke(this, eventArgs);
return DisconnectedHandler?.HandleDisconnectedAsync(eventArgs);
}

private void OnConnected(object sender, MqttClientConnectedEventArgs eventArgs)
private Task OnConnected(MqttClientConnectedEventArgs eventArgs)
{
Connected?.Invoke(this, eventArgs);
return ConnectedHandler?.HandleConnectedAsync(eventArgs);
}

private void StartPublishing()


+ 53
- 0
Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientExtensions.cs View File

@@ -2,13 +2,66 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;

namespace MQTTnet.Extensions.ManagedClient
{
public static class ManagedMqttClientExtensions
{
public static IManagedMqttClient UseConnectedHandler(this IManagedMqttClient client, Func<MqttClientConnectedEventArgs, Task> handler)
{
if (handler == null)
{
client.ConnectedHandler = null;
return client;
}

client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler);
return client;
}

public static IManagedMqttClient UseDisconnectedHandler(this IManagedMqttClient client, Func<MqttClientDisconnectedEventArgs, Task> handler)
{
if (handler == null)
{
client.DisconnectedHandler = null;
return client;
}

client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler);
return client;
}

public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Func<MqttApplicationMessageHandlerContext, Task> handler)
{
if (handler == null)
{
client.ReceivedApplicationMessageHandler = null;
return client;
}

client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);

return client;
}

public static IManagedMqttClient UseReceivedApplicationMessageHandler(this IManagedMqttClient client, Action<MqttApplicationMessageHandlerContext> handler)
{
if (handler == null)
{
client.ReceivedApplicationMessageHandler = null;
return client;
}

client.ReceivedApplicationMessageHandler = new MqttApplicationMessageHandlerDelegate(handler);

return client;
}

public static Task SubscribeAsync(this IManagedMqttClient managedClient, params TopicFilter[] topicFilters)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));


+ 9
- 0
Source/MQTTnet/Client/Connecting/IMqttClientConnectedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Client.Connecting
{
public interface IMqttClientConnectedHandler
{
Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs);
}
}

+ 31
- 0
Source/MQTTnet/Client/Connecting/MqttClientConnectedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Client.Connecting
{
public class MqttClientConnectedHandlerDelegate : IMqttClientConnectedHandler
{
private readonly Func<MqttClientConnectedEventArgs, Task> _handler;

public MqttClientConnectedHandlerDelegate(Action<MqttClientConnectedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public MqttClientConnectedHandlerDelegate(Func<MqttClientConnectedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 9
- 0
Source/MQTTnet/Client/Disconnecting/IMqttClientDisconnectedHandler.cs View File

@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace MQTTnet.Client.Disconnecting
{
public interface IMqttClientDisconnectedHandler
{
Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs);
}
}

+ 31
- 0
Source/MQTTnet/Client/Disconnecting/MqttClientDisconnectedHandlerDelegate.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;

namespace MQTTnet.Client.Disconnecting
{
public class MqttClientDisconnectedHandlerDelegate : IMqttClientDisconnectedHandler
{
private readonly Func<MqttClientDisconnectedEventArgs, Task> _handler;

public MqttClientDisconnectedHandlerDelegate(Action<MqttClientDisconnectedEventArgs> handler)
{
if (handler == null) throw new ArgumentNullException(nameof(handler));

_handler = context =>
{
handler(context);
return Task.FromResult(0);
};
}

public MqttClientDisconnectedHandlerDelegate(Func<MqttClientDisconnectedEventArgs, Task> handler)
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

public Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
return _handler(eventArgs);
}
}
}

+ 5
- 0
Source/MQTTnet/Client/IMqttClient.cs View File

@@ -14,7 +14,12 @@ namespace MQTTnet.Client
bool IsConnected { get; }
IMqttClientOptions Options { get; }

IMqttClientConnectedHandler ConnectedHandler { get; set; }
[Obsolete("Use ConnectedHandler instead.")]
event EventHandler<MqttClientConnectedEventArgs> Connected;

IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }
[Obsolete("Use DisconnectedHandler instead.")]
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken);


+ 8
- 1
Source/MQTTnet/Client/MqttClient.cs View File

@@ -44,8 +44,12 @@ namespace MQTTnet.Client
_logger = logger.CreateChildLogger(nameof(MqttClient));
}

public IMqttClientConnectedHandler ConnectedHandler { get; set; }

public event EventHandler<MqttClientConnectedEventArgs> Connected;

public IMqttClientDisconnectedHandler DisconnectedHandler { get; set; }

public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler { get; set; }
@@ -93,10 +97,12 @@ namespace MQTTnet.Client
}

IsConnected = true;
Connected?.Invoke(this, new MqttClientConnectedEventArgs(authenticateResult));

_logger.Info("Connected.");

Connected?.Invoke(this, new MqttClientConnectedEventArgs(authenticateResult));
await ConnectedHandler?.HandleConnectedAsync(new MqttClientConnectedEventArgs(authenticateResult));

return authenticateResult;
}
catch (Exception exception)
@@ -259,6 +265,7 @@ namespace MQTTnet.Client

_logger.Info("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
await DisconnectedHandler?.HandleDisconnectedAsync(new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
}



+ 26
- 2
Source/MQTTnet/Client/MqttClientExtensions.cs View File

@@ -15,6 +15,30 @@ namespace MQTTnet.Client
{
public static class MqttClientExtensions
{
public static IMqttClient UseConnectedHandler(this IMqttClient client, Func<MqttClientConnectedEventArgs, Task> handler)
{
if (handler == null)
{
client.ConnectedHandler = null;
return client;
}

client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(handler);
return client;
}

public static IMqttClient UseDisconnectedHandler(this IMqttClient client, Func<MqttClientDisconnectedEventArgs, Task> handler)
{
if (handler == null)
{
client.DisconnectedHandler = null;
return client;
}

client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(handler);
return client;
}

public static IMqttClient UseReceivedApplicationMessageHandler(this IMqttClient client, Func<MqttApplicationMessageHandlerContext, Task> handler)
{
if (handler == null)
@@ -143,7 +167,7 @@ namespace MQTTnet.Client
await client.PublishAsync(applicationMessage).ConfigureAwait(false);
}
}
public static Task<MqttClientPublishResult> PublishAsync(this IMqttClient client, string topic)
{
if (client == null) throw new ArgumentNullException(nameof(client));
@@ -175,7 +199,7 @@ namespace MQTTnet.Client
.WithPayload(payload)
.Build());
}
public static Task<MqttClientPublishResult> PublishAsync(this IMqttClient client, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
{
if (client == null) throw new ArgumentNullException(nameof(client));


Loading…
Cancel
Save