Kaynağa Gözat

Use event pattern correctly for incoming server connections.

release/3.x.x
Christian Kratky 7 yıl önce
ebeveyn
işleme
9915cc4492
10 değiştirilmiş dosya ile 44 ekleme ve 34 silme
  1. +11
    -11
      Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs
  2. +4
    -10
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  3. +1
    -1
      MQTTnet.Core/Adapter/IMqttServerAdapter.cs
  4. +14
    -0
      MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs
  5. +2
    -3
      MQTTnet.Core/Server/MqttClientConnectedEventArgs.cs
  6. +2
    -3
      MQTTnet.Core/Server/MqttClientDisconnectedEventArgs.cs
  7. +2
    -2
      MQTTnet.Core/Server/MqttServer.cs
  8. +1
    -1
      MQTTnet.sln
  9. +2
    -2
      Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs
  10. +5
    -1
      Tests/MQTTnet.TestApp.NetCore/Program.cs

+ 11
- 11
Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs Dosyayı Görüntüle

@@ -20,15 +20,11 @@ namespace MQTTnet.Implementations
private Socket _tlsEndpointSocket;
private X509Certificate2 _tlsCertificate;

private bool _isRunning;

public event Action<IMqttCommunicationAdapter> ClientAccepted;
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;

public Task StartAsync(MqttServerOptions options)
{
if (_isRunning) throw new InvalidOperationException("Server is already started.");

_isRunning = true;
if (_cancellationTokenSource != null) throw new InvalidOperationException("Server is already started.");

_cancellationTokenSource = new CancellationTokenSource();

@@ -49,6 +45,10 @@ namespace MQTTnet.Implementations
}

_tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate);
if (!_tlsCertificate.HasPrivateKey)
{
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key.");
}

_tlsEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
_tlsEndpointSocket.Bind(new IPEndPoint(IPAddress.Any, options.GetTlsEndpointPort()));
@@ -62,8 +62,6 @@ namespace MQTTnet.Implementations

public Task StopAsync()
{
_isRunning = false;

_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
@@ -71,6 +69,8 @@ namespace MQTTnet.Implementations
_defaultEndpointSocket?.Dispose();
_defaultEndpointSocket = null;

_tlsCertificate = null;
_tlsEndpointSocket?.Dispose();
_tlsEndpointSocket = null;

@@ -88,14 +88,14 @@ namespace MQTTnet.Implementations
{
try
{
//todo: else branch can be used with min dependency NET46
//todo: else branch can be used with min dependency NET46
#if NET45
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false);
#else
var clientSocket = await _defaultEndpointSocket.AcceptAsync().ConfigureAwait(false);
#endif
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer());
ClientAccepted?.Invoke(clientAdapter);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
}
catch (Exception exception)
{
@@ -123,7 +123,7 @@ namespace MQTTnet.Implementations
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false);

var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer());
ClientAccepted?.Invoke(clientAdapter);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
}
catch (Exception exception)
{


+ 4
- 10
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs Dosyayı Görüntüle

@@ -12,18 +12,14 @@ namespace MQTTnet.Implementations
{
private StreamSocketListener _defaultEndpointSocket;

private bool _isRunning;

public event Action<IMqttCommunicationAdapter> ClientAccepted;
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;

public async Task StartAsync(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (_isRunning) throw new InvalidOperationException("Server is already started.");

_isRunning = true;

if (_defaultEndpointSocket != null) throw new InvalidOperationException("Server is already started.");
if (options.DefaultEndpointOptions.IsEnabled)
{
_defaultEndpointSocket = new StreamSocketListener();
@@ -39,8 +35,6 @@ namespace MQTTnet.Implementations

public Task StopAsync()
{
_isRunning = false;

_defaultEndpointSocket?.Dispose();
_defaultEndpointSocket = null;

@@ -57,7 +51,7 @@ namespace MQTTnet.Implementations
try
{
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttPacketSerializer());
ClientAccepted?.Invoke(clientAdapter);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
}
catch (Exception exception)
{


+ 1
- 1
MQTTnet.Core/Adapter/IMqttServerAdapter.cs Dosyayı Görüntüle

@@ -6,7 +6,7 @@ namespace MQTTnet.Core.Adapter
{
public interface IMqttServerAdapter
{
event Action<IMqttCommunicationAdapter> ClientAccepted;
event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;

Task StartAsync(MqttServerOptions options);
Task StopAsync();


+ 14
- 0
MQTTnet.Core/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs Dosyayı Görüntüle

@@ -0,0 +1,14 @@
using System;

namespace MQTTnet.Core.Adapter
{
public class MqttServerAdapterClientAcceptedEventArgs : EventArgs
{
public MqttServerAdapterClientAcceptedEventArgs(IMqttCommunicationAdapter client)
{
Client = client ?? throw new ArgumentNullException(nameof(client));
}

public IMqttCommunicationAdapter Client { get; }
}
}

MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs → MQTTnet.Core/Server/MqttClientConnectedEventArgs.cs Dosyayı Görüntüle

@@ -1,7 +1,6 @@
using MQTTnet.Core.Server;
using System;
using System;

namespace MQTTnet.Core.Adapter
namespace MQTTnet.Core.Server
{
public class MqttClientConnectedEventArgs : EventArgs
{

MQTTnet.Core/Adapter/MqttClientDisconnectedEventArgs.cs → MQTTnet.Core/Server/MqttClientDisconnectedEventArgs.cs Dosyayı Görüntüle

@@ -1,7 +1,6 @@
using MQTTnet.Core.Server;
using System;
using System;

namespace MQTTnet.Core.Adapter
namespace MQTTnet.Core.Server
{
public class MqttClientDisconnectedEventArgs : EventArgs
{

+ 2
- 2
MQTTnet.Core/Server/MqttServer.cs Dosyayı Görüntüle

@@ -77,9 +77,9 @@ namespace MQTTnet.Core.Server
MqttNetTrace.Information(nameof(MqttServer), "Stopped.");
}

private void OnClientAccepted(IMqttCommunicationAdapter adapter)
private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(adapter), _cancellationTokenSource.Token);
Task.Run(() =>_clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token);
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)


+ 1
- 1
MQTTnet.sln Dosyayı Görüntüle

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26730.16
VisualStudioVersion = 15.0.27004.2002
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject


+ 2
- 2
Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs Dosyayı Görüntüle

@@ -8,7 +8,7 @@ namespace MQTTnet.Core.Tests
{
public class TestMqttServerAdapter : IMqttServerAdapter
{
public event Action<IMqttCommunicationAdapter> ClientAccepted;
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted;

public async Task<MqttClient> ConnectTestClient(IMqttServer server, string clientId, MqttApplicationMessage willMessage = null)
{
@@ -47,7 +47,7 @@ namespace MQTTnet.Core.Tests

private void FireClientAcceptedEvent(IMqttCommunicationAdapter adapter)
{
ClientAccepted?.Invoke(adapter);
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(adapter));
}

public Task StartAsync(MqttServerOptions options)


+ 5
- 1
Tests/MQTTnet.TestApp.NetCore/Program.cs Dosyayı Görüntüle

@@ -6,6 +6,7 @@ using MQTTnet.Core.Protocol;
using MQTTnet.Core.Server;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -42,7 +43,7 @@ namespace MQTTnet.TestApp.NetCore
{
MqttNetTrace.TraceMessagePublished += (s, e) =>
{
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
Console.WriteLine($">> [{DateTime.Now:O}] [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");
if (e.Exception != null)
{
Console.WriteLine(e.Exception);
@@ -156,6 +157,9 @@ namespace MQTTnet.TestApp.NetCore
}
};

var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);

var mqttServer = new MqttServerFactory().CreateMqttServer(options);
mqttServer.StartAsync();



Yükleniyor…
İptal
Kaydet