Added TLS1.2 support to client and server.release/3.x.x
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Core.Adapter; | using MQTTnet.Core.Adapter; | ||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | using MQTTnet.Core.Client; | ||||
using MQTTnet.Core.Serializer; | using MQTTnet.Core.Serializer; | ||||
@@ -11,7 +12,9 @@ namespace MQTTnet | |||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | if (options == null) throw new ArgumentNullException(nameof(options)); | ||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); | |||||
return new MqttClient(options, | |||||
new MqttChannelCommunicationAdapter(options.UseSSL ? new MqttClientSslChannel() : (IMqttCommunicationChannel) new MqttTcpChannel(), | |||||
new DefaultMqttV311PacketSerializer())); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,122 @@ | |||||
using System; | |||||
using System.IO; | |||||
using System.Net.Security; | |||||
using System.Net.Sockets; | |||||
using System.Security.Authentication; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet | |||||
{ | |||||
/// <summary> | |||||
/// Describes an SSL channel to an MQTT server. | |||||
/// </summary> | |||||
public class MqttClientSslChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private readonly Socket _socket; | |||||
private SslStream _sslStream; | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public MqttClientSslChannel() | |||||
{ | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | |||||
} | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/> with a predefined <paramref name="socket"/>. | |||||
/// </summary> | |||||
/// <param name="socket"></param> | |||||
public MqttClientSslChannel(Socket socket) | |||||
{ | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously connects to the host described in the <see cref="MqttClientOptions"/>. | |||||
/// </summary> | |||||
/// <param name="options">The <see cref="MqttClientOptions"/> describing the connection.</param> | |||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
try | |||||
{ | |||||
await _socket.ConnectAsync(options.Server, options.Port); | |||||
NetworkStream ns = new NetworkStream(_socket, true); | |||||
_sslStream = new SslStream(ns); | |||||
await _sslStream.AuthenticateAsClientAsync(options.Server, null, SslProtocols.Tls12, false); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously disconnects the client from the server. | |||||
/// </summary> | |||||
public Task DisconnectAsync() | |||||
{ | |||||
try | |||||
{ | |||||
_socket.Dispose(); | |||||
return Task.FromResult(0); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously writes a sequence of bytes to the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write data from.</param> | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) | |||||
throw new ArgumentNullException(nameof(buffer)); | |||||
try | |||||
{ | |||||
return _sslStream.WriteAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously reads a sequence of bytes from the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write the data into.</param> | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
try | |||||
{ | |||||
return _sslStream.ReadAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Releases all resources used by the <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
_sslStream?.Dispose(); | |||||
_socket?.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,4 +1,5 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Server; | using MQTTnet.Core.Server; | ||||
namespace MQTTnet | namespace MQTTnet | ||||
@@ -8,8 +9,8 @@ namespace MQTTnet | |||||
public MqttServer CreateMqttServer(MqttServerOptions options) | public MqttServer CreateMqttServer(MqttServerOptions options) | ||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | if (options == null) throw new ArgumentNullException(nameof(options)); | ||||
return new MqttServer(options, new MqttServerAdapter()); | |||||
return new MqttServer(options, options.UseSSL ? (IMqttServerAdapter)new MqttSslServerAdapter() : new MqttServerAdapter()); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,123 @@ | |||||
using System; | |||||
using System.IO; | |||||
using System.Net.Security; | |||||
using System.Net.Sockets; | |||||
using System.Security.Authentication; | |||||
using System.Security.Cryptography.X509Certificates; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet | |||||
{ | |||||
/// <summary> | |||||
/// Describes an SSL channel to a client. | |||||
/// </summary> | |||||
public class MqttServerSslChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private readonly Socket _socket; | |||||
private readonly SslStream _sslStream; | |||||
private readonly X509Certificate2 _cert; | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/> with a predefined <paramref name="socket"/>. | |||||
/// </summary> | |||||
/// <param name="socket">The client socket.</param> | |||||
/// <param name="cert">The X509 certificate used to authenticate as a server.</param> | |||||
public MqttServerSslChannel(Socket socket, X509Certificate2 cert) | |||||
{ | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | |||||
_cert = cert ?? throw new ArgumentNullException(nameof(cert)); | |||||
if (!_socket.Connected) | |||||
return; | |||||
NetworkStream ns = new NetworkStream(_socket, true); | |||||
_sslStream = new SslStream(ns); | |||||
} | |||||
public Task Authenticate() | |||||
{ | |||||
return _sslStream.AuthenticateAsServerAsync(_cert, false, SslProtocols.Tls12, false); | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously connects to the client described in the <see cref="MqttClientOptions"/>. | |||||
/// </summary> | |||||
/// <param name="options">The <see cref="MqttClientOptions"/> describing the connection.</param> | |||||
public Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
try | |||||
{ | |||||
return _socket.ConnectAsync(options.Server, options.Port); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously disconnects the client from the server. | |||||
/// </summary> | |||||
public Task DisconnectAsync() | |||||
{ | |||||
try | |||||
{ | |||||
_socket.Dispose(); | |||||
return Task.FromResult(0); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously writes a sequence of bytes to the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write data from.</param> | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) | |||||
throw new ArgumentNullException(nameof(buffer)); | |||||
try | |||||
{ | |||||
return _sslStream.WriteAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously reads a sequence of bytes from the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write the data into.</param> | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
try | |||||
{ | |||||
return _sslStream.ReadAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Releases all resources used by the <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
_sslStream?.Dispose(); | |||||
_socket?.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,64 @@ | |||||
using System; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Security.Cryptography.X509Certificates; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Serializer; | |||||
using MQTTnet.Core.Server; | |||||
namespace MQTTnet | |||||
{ | |||||
public class MqttSslServerAdapter : IMqttServerAdapter, IDisposable | |||||
{ | |||||
private CancellationTokenSource _cancellationTokenSource; | |||||
private Socket _socket; | |||||
private X509Certificate2 _x590Certificate2; | |||||
public event EventHandler<MqttClientConnectedEventArgs> ClientConnected; | |||||
public void Start(MqttServerOptions options) | |||||
{ | |||||
if (_socket != null) throw new InvalidOperationException("Server is already started."); | |||||
_cancellationTokenSource = new CancellationTokenSource(); | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | |||||
_socket.Bind(new IPEndPoint(IPAddress.Any, options.Port)); | |||||
_socket.Listen(options.ConnectionBacklog); | |||||
Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); | |||||
_x590Certificate2 = new X509Certificate2(options.CertificatePath); | |||||
} | |||||
public void Stop() | |||||
{ | |||||
_cancellationTokenSource?.Dispose(); | |||||
_cancellationTokenSource = null; | |||||
_socket?.Dispose(); | |||||
_socket = null; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Stop(); | |||||
} | |||||
private async Task AcceptConnectionsAsync(CancellationToken cancellationToken) | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
var clientSocket = await _socket.AcceptAsync(); | |||||
MqttServerSslChannel mssc = new MqttServerSslChannel(clientSocket, _x590Certificate2); | |||||
await mssc.Authenticate(); | |||||
var clientAdapter = new MqttChannelCommunicationAdapter(mssc, new DefaultMqttV311PacketSerializer()); | |||||
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -21,11 +21,11 @@ namespace MQTTnet | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | _socket = socket ?? throw new ArgumentNullException(nameof(socket)); | ||||
} | } | ||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
public Task ConnectAsync(MqttClientOptions options) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await _socket.ConnectAsync(options.Server, options.Port); | |||||
return _socket.ConnectAsync(options.Server, options.Port); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -33,12 +33,12 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task DisconnectAsync() | |||||
public Task DisconnectAsync() | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
_socket.Dispose(); | _socket.Dispose(); | ||||
await Task.FromResult(0); | |||||
return Task.FromResult(0); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -46,13 +46,13 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task WriteAsync(byte[] buffer) | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | { | ||||
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | ||||
try | try | ||||
{ | { | ||||
await _socket.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None); | |||||
return _socket.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -60,12 +60,12 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task ReadAsync(byte[] buffer) | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
var buffer2 = new ArraySegment<byte>(buffer); | var buffer2 = new ArraySegment<byte>(buffer); | ||||
await _socket.ReceiveAsync(buffer2, SocketFlags.None); | |||||
return _socket.ReceiveAsync(buffer2, SocketFlags.None); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -103,6 +103,9 @@ | |||||
<Compile Include="MqttClientFactory.cs" /> | <Compile Include="MqttClientFactory.cs" /> | ||||
<Compile Include="MqttServerAdapter.cs" /> | <Compile Include="MqttServerAdapter.cs" /> | ||||
<Compile Include="MqttServerFactory.cs" /> | <Compile Include="MqttServerFactory.cs" /> | ||||
<Compile Include="MqttClientSslChannel.cs" /> | |||||
<Compile Include="MqttServerSslChannel.cs" /> | |||||
<Compile Include="MqttSslServerAdapter.cs" /> | |||||
<Compile Include="Properties\AssemblyInfo.cs" /> | <Compile Include="Properties\AssemblyInfo.cs" /> | ||||
<Compile Include="MqttTcpChannel.cs" /> | <Compile Include="MqttTcpChannel.cs" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -1,5 +1,6 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Core.Adapter; | using MQTTnet.Core.Adapter; | ||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | using MQTTnet.Core.Client; | ||||
using MQTTnet.Core.Serializer; | using MQTTnet.Core.Serializer; | ||||
@@ -10,8 +11,10 @@ namespace MQTTnet | |||||
public MqttClient CreateMqttClient(MqttClientOptions options) | public MqttClient CreateMqttClient(MqttClientOptions options) | ||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | if (options == null) throw new ArgumentNullException(nameof(options)); | ||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); | |||||
return new MqttClient(options, | |||||
new MqttChannelCommunicationAdapter(options.UseSSL ? new MqttClientSslChannel() : (IMqttCommunicationChannel)new MqttTcpChannel(), | |||||
new DefaultMqttV311PacketSerializer())); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,121 @@ | |||||
using System; | |||||
using System.IO; | |||||
using System.Net.Security; | |||||
using System.Net.Sockets; | |||||
using System.Security.Authentication; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet | |||||
{ | |||||
/// <summary> | |||||
/// Describes an SSL channel to an MQTT server. | |||||
/// </summary> | |||||
public class MqttClientSslChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private readonly Socket _socket; | |||||
private SslStream _sslStream; | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public MqttClientSslChannel() | |||||
{ | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | |||||
} | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/> with a predefined <paramref name="socket"/>. | |||||
/// </summary> | |||||
/// <param name="socket"></param> | |||||
public MqttClientSslChannel(Socket socket) | |||||
{ | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously connects to the host described in the <see cref="MqttClientOptions"/>. | |||||
/// </summary> | |||||
/// <param name="options">The <see cref="MqttClientOptions"/> describing the connection.</param> | |||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
try | |||||
{ | |||||
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, | |||||
null); | |||||
NetworkStream ns = new NetworkStream(_socket, true); | |||||
_sslStream = new SslStream(ns); | |||||
await _sslStream.AuthenticateAsClientAsync(options.Server, null, SslProtocols.Tls12, false); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously disconnects the client from the server. | |||||
/// </summary> | |||||
public Task DisconnectAsync() | |||||
{ | |||||
try | |||||
{ | |||||
return Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously writes a sequence of bytes to the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write data from.</param> | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) | |||||
throw new ArgumentNullException(nameof(buffer)); | |||||
try | |||||
{ | |||||
return _sslStream.WriteAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously reads a sequence of bytes from the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write the data into.</param> | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
try | |||||
{ | |||||
return _sslStream.ReadAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Releases all resources used by the <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
_sslStream?.Dispose(); | |||||
_socket?.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,4 +1,5 @@ | |||||
using System; | using System; | ||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Server; | using MQTTnet.Core.Server; | ||||
namespace MQTTnet | namespace MQTTnet | ||||
@@ -8,8 +9,8 @@ namespace MQTTnet | |||||
public MqttServer CreateMqttServer(MqttServerOptions options) | public MqttServer CreateMqttServer(MqttServerOptions options) | ||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | if (options == null) throw new ArgumentNullException(nameof(options)); | ||||
return new MqttServer(options, new MqttServerAdapter()); | |||||
return new MqttServer(options, options.UseSSL ? (IMqttServerAdapter) new MqttSslServerAdapter() : new MqttServerAdapter()); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,123 @@ | |||||
using System; | |||||
using System.IO; | |||||
using System.Net.Security; | |||||
using System.Net.Sockets; | |||||
using System.Security.Authentication; | |||||
using System.Security.Cryptography.X509Certificates; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
namespace MQTTnet | |||||
{ | |||||
/// <summary> | |||||
/// Describes an SSL channel to a client. | |||||
/// </summary> | |||||
public class MqttServerSslChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private readonly Socket _socket; | |||||
private SslStream _sslStream; | |||||
private X509Certificate2 _cert; | |||||
/// <summary> | |||||
/// Creates a new <see cref="MqttClientSslChannel"/> with a predefined <paramref name="socket"/>. | |||||
/// </summary> | |||||
/// <param name="socket">The client socket.</param> | |||||
/// <param name="cert">The X509 certificate used to authenticate as a server.</param> | |||||
public MqttServerSslChannel(Socket socket, X509Certificate2 cert) | |||||
{ | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | |||||
_cert = cert ?? throw new ArgumentNullException(nameof(cert)); | |||||
if (!_socket.Connected) | |||||
return; | |||||
NetworkStream ns = new NetworkStream(_socket, true); | |||||
_sslStream = new SslStream(ns); | |||||
} | |||||
public async Task Authenticate() | |||||
{ | |||||
await _sslStream.AuthenticateAsServerAsync(_cert, false, SslProtocols.Tls12, false); | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously connects to the client described in the <see cref="MqttClientOptions"/>. | |||||
/// </summary> | |||||
/// <param name="options">The <see cref="MqttClientOptions"/> describing the connection.</param> | |||||
public Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
try | |||||
{ | |||||
return Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, | |||||
null); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously disconnects the client from the server. | |||||
/// </summary> | |||||
public Task DisconnectAsync() | |||||
{ | |||||
try | |||||
{ | |||||
return Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); | |||||
} | |||||
catch (SocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously writes a sequence of bytes to the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write data from.</param> | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) | |||||
throw new ArgumentNullException(nameof(buffer)); | |||||
try | |||||
{ | |||||
return _sslStream.WriteAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Asynchronously reads a sequence of bytes from the socket. | |||||
/// </summary> | |||||
/// <param name="buffer">The buffer to write the data into.</param> | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
try | |||||
{ | |||||
return _sslStream.ReadAsync(buffer, 0, buffer.Length); | |||||
} | |||||
catch (Exception ex) | |||||
when (ex is SocketException || ex is IOException) | |||||
{ | |||||
throw new MqttCommunicationException(ex); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// Releases all resources used by the <see cref="MqttClientSslChannel"/>. | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
_sslStream?.Dispose(); | |||||
_socket?.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,64 @@ | |||||
using System; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Security.Cryptography.X509Certificates; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Serializer; | |||||
using MQTTnet.Core.Server; | |||||
namespace MQTTnet | |||||
{ | |||||
public class MqttSslServerAdapter : IMqttServerAdapter, IDisposable | |||||
{ | |||||
private CancellationTokenSource _cancellationTokenSource; | |||||
private Socket _socket; | |||||
private X509Certificate2 _x590Certificate2; | |||||
public event EventHandler<MqttClientConnectedEventArgs> ClientConnected; | |||||
public void Start(MqttServerOptions options) | |||||
{ | |||||
if (_socket != null) throw new InvalidOperationException("Server is already started."); | |||||
_cancellationTokenSource = new CancellationTokenSource(); | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | |||||
_socket.Bind(new IPEndPoint(IPAddress.Any, options.Port)); | |||||
_socket.Listen(options.ConnectionBacklog); | |||||
Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); | |||||
_x590Certificate2 = new X509Certificate2(options.CertificatePath); | |||||
} | |||||
public void Stop() | |||||
{ | |||||
_cancellationTokenSource?.Dispose(); | |||||
_cancellationTokenSource = null; | |||||
_socket?.Dispose(); | |||||
_socket = null; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Stop(); | |||||
} | |||||
private async Task AcceptConnectionsAsync(CancellationToken cancellationToken) | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null); | |||||
MqttServerSslChannel mssc = new MqttServerSslChannel(clientSocket, _x590Certificate2); | |||||
await mssc.Authenticate(); | |||||
var clientAdapter = new MqttChannelCommunicationAdapter(mssc, new DefaultMqttV311PacketSerializer()); | |||||
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -21,11 +21,11 @@ namespace MQTTnet | |||||
_socket = socket ?? throw new ArgumentNullException(nameof(socket)); | _socket = socket ?? throw new ArgumentNullException(nameof(socket)); | ||||
} | } | ||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
public Task ConnectAsync(MqttClientOptions options) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null); | |||||
return Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -33,11 +33,11 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task DisconnectAsync() | |||||
public Task DisconnectAsync() | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); | |||||
return Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); | |||||
} | } | ||||
catch (SocketException exception) | catch (SocketException exception) | ||||
{ | { | ||||
@@ -45,13 +45,13 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task WriteAsync(byte[] buffer) | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | { | ||||
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | ||||
try | try | ||||
{ | { | ||||
await Task.Factory.FromAsync( | |||||
return Task.Factory.FromAsync( | |||||
// ReSharper disable once AssignNullToNotNullAttribute | // ReSharper disable once AssignNullToNotNullAttribute | ||||
_socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null), | _socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null), | ||||
_socket.EndSend); | _socket.EndSend); | ||||
@@ -62,11 +62,11 @@ namespace MQTTnet | |||||
} | } | ||||
} | } | ||||
public async Task ReadAsync(byte[] buffer) | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await Task.Factory.FromAsync( | |||||
return Task.Factory.FromAsync( | |||||
// ReSharper disable once AssignNullToNotNullAttribute | // ReSharper disable once AssignNullToNotNullAttribute | ||||
_socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null), | _socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null), | ||||
_socket.EndReceive); | _socket.EndReceive); | ||||
@@ -19,5 +19,24 @@ namespace MQTTnet.Core.Client | |||||
public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); | public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5); | ||||
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | ||||
/// <summary> | |||||
/// Use SSL to communicate with the MQTT server. | |||||
/// </summary> | |||||
/// <remarks>Setting this value to <c>true</c> will also set <see cref="Port"/> to <c>8883</c> if its value was <c>1883</c> (not set).</remarks> | |||||
public bool UseSSL | |||||
{ | |||||
get => _useSSL; | |||||
set | |||||
{ | |||||
// Automatically set the port to the MQTT SSL port (8883) if it wasn't set already | |||||
if (value && Port == 1883) | |||||
Port = 8883; | |||||
_useSSL = value; | |||||
} | |||||
} | |||||
private bool _useSSL; | |||||
} | } | ||||
} | } |
@@ -13,5 +13,12 @@ namespace MQTTnet.Core.Server | |||||
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | ||||
public Func<MqttConnectPacket, MqttConnectReturnCode> ConnectionValidator { get; set; } | public Func<MqttConnectPacket, MqttConnectReturnCode> ConnectionValidator { get; set; } | ||||
public bool UseSSL = false; | |||||
/// <summary> | |||||
/// The path to the X509 SSL certificate. | |||||
/// </summary> | |||||
public string CertificatePath = string.Empty; | |||||
} | } | ||||
} | } |