From 3722bc9ae6409c5c9158ea51f286a77631530876 Mon Sep 17 00:00:00 2001 From: 1iveowl Date: Fri, 14 Jul 2017 08:26:25 +0200 Subject: [PATCH] Folder fix --- .../Implementations/MqttServerAdapter.cs | 117 +++++++++++++++++ .../Implementations/MqttTcpChannel.cs | 122 ++++++++++++++++++ .../MQTTnet.Netstandard.csproj | 21 +++ .../MQTTnet.NetStandard/MqttClientFactory.cs | 18 +++ .../MQTTnet.NetStandard/MqttServerFactory.cs | 18 +++ 5 files changed, 296 insertions(+) create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs create mode 100644 Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs create mode 100644 Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj create mode 100644 Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs create mode 100644 Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs new file mode 100644 index 0000000..0203134 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttServerAdapter.cs @@ -0,0 +1,117 @@ +using System; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; + +namespace MQTTnet.Implementations +{ + public class MqttServerAdapter : IMqttServerAdapter, IDisposable + { + private CancellationTokenSource _cancellationTokenSource; + private Socket _defaultEndpointSocket; + private Socket _tlsEndpointSocket; + private X509Certificate2 _tlsCertificate; + + private bool _isRunning; + + public event EventHandler ClientConnected; + + public void Start(MqttServerOptions options) + { + if (_isRunning) throw new InvalidOperationException("Server is already started."); + _isRunning = true; + + _cancellationTokenSource = new CancellationTokenSource(); + + if (options.DefaultEndpointOptions.IsEnabled) + { + _defaultEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _defaultEndpointSocket.Bind(new IPEndPoint(IPAddress.Any, options.GetDefaultEndpointPort())); + _defaultEndpointSocket.Listen(options.ConnectionBacklog); + + Task.Run(() => AcceptDefaultEndpointConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); + } + + if (options.TlsEndpointOptions.IsEnabled) + { + if (options.TlsEndpointOptions.Certificate == null) + { + throw new ArgumentException("TLS certificate is not set."); + } + + _tlsCertificate = new X509Certificate2(options.TlsEndpointOptions.Certificate); + + _tlsEndpointSocket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _tlsEndpointSocket.Bind(new IPEndPoint(IPAddress.Any, options.GetTlsEndpointPort())); + _tlsEndpointSocket.Listen(options.ConnectionBacklog); + + Task.Run(() => AcceptTlsEndpointConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); + } + } + + public void Stop() + { + _isRunning = false; + + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + _defaultEndpointSocket?.Dispose(); + _defaultEndpointSocket = null; + + _tlsEndpointSocket?.Dispose(); + _tlsEndpointSocket = null; + } + + public void Dispose() + { + Stop(); + } + + private async Task AcceptDefaultEndpointConnectionsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + var clientSocket = await _defaultEndpointSocket.AcceptAsync(); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer()); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while acceping connection at default endpoint."); + } + } + } + + private async Task AcceptTlsEndpointConnectionsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + var clientSocket = await _defaultEndpointSocket.AcceptAsync(); + + var sslStream = new SslStream(new NetworkStream(clientSocket)); + await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); + + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer()); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + } + catch (Exception exception) + { + MqttTrace.Error(nameof(MqttServerAdapter), exception, "Error while acceping connection at TLS endpoint."); + } + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs new file mode 100644 index 0000000..e78cd98 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -0,0 +1,122 @@ +using System; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; + +namespace MQTTnet.Implementations +{ + public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable + { + private readonly Socket _socket; + private SslStream _sslStream; + + public MqttTcpChannel() + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + + public MqttTcpChannel(Socket socket, SslStream sslStream) + { + _socket = socket ?? throw new ArgumentNullException(nameof(socket)); + _sslStream = sslStream; + } + + public async Task ConnectAsync(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + try + { + await _socket.ConnectAsync(options.Server, options.GetPort()); + + if (options.TlsOptions.UseTls) + { + _sslStream = new SslStream(new NetworkStream(_socket, true)); + await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); + } + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public Task DisconnectAsync() + { + try + { + _sslStream.Dispose(); + _socket.Dispose(); + return Task.FromResult(0); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + try + { + if (_sslStream != null) + { + return _sslStream.WriteAsync(buffer, 0, buffer.Length); + } + + return _socket.SendAsync(new ArraySegment(buffer), SocketFlags.None); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public Task ReadAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + try + { + if (_sslStream != null) + { + return _sslStream.ReadAsync(buffer, 0, buffer.Length); + } + + return _socket.ReceiveAsync(new ArraySegment(buffer), SocketFlags.None); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public void Dispose() + { + _socket?.Dispose(); + _sslStream?.Dispose(); + } + + private static X509CertificateCollection LoadCertificates(MqttClientOptions options) + { + var certificates = new X509CertificateCollection(); + if (options.TlsOptions.Certificates == null) + { + return certificates; + } + + foreach (var certificate in options.TlsOptions.Certificates) + { + certificates.Add(new X509Certificate(certificate)); + } + + return certificates; + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj new file mode 100644 index 0000000..10ffeed --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -0,0 +1,21 @@ + + + + netstandard1.3 + MQTTnet + MQTTnet + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs new file mode 100644 index 0000000..ac6f611 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -0,0 +1,18 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; +using MQTTnet.Implementations; + +namespace MQTTnet +{ + public class MqttClientFactory + { + public MqttClient CreateMqttClient(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + } + } +} diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs new file mode 100644 index 0000000..eb7441c --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Server; +using MQTTnet.Implementations; + +namespace MQTTnet +{ + public class MqttServerFactory + { + public MqttServer CreateMqttServer(MqttServerOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttServer(options, new List { new MqttServerAdapter() }); + } + } +}