@@ -15,6 +15,7 @@ | |||||
* [Core] Added extension method to allow usage of _WebSocket4Net_ in clients to fix issues with AWS and Xamarin. | * [Core] Added extension method to allow usage of _WebSocket4Net_ in clients to fix issues with AWS and Xamarin. | ||||
* [Core] Fixed usage of wrong data type for passwords (string -> byte[]). | * [Core] Fixed usage of wrong data type for passwords (string -> byte[]). | ||||
* [Core] Fixed an _ObjectDisposedException_ when sending data using a WebSocket channel. | * [Core] Fixed an _ObjectDisposedException_ when sending data using a WebSocket channel. | ||||
* [Core] Performance optimizations. | |||||
* [Client] Added support for extended authentication exchange. | * [Client] Added support for extended authentication exchange. | ||||
* [Client] Exposed MQTTv5 CONNACK values to client. | * [Client] Exposed MQTTv5 CONNACK values to client. | ||||
* [Client] Added _MqttClientSubscribeOptionsBuilder_. | * [Client] Added _MqttClientSubscribeOptionsBuilder_. | ||||
@@ -9,6 +9,7 @@ namespace MQTTnet.AspNetCore | |||||
public class SpanBasedMqttPacketBodyReader : IMqttPacketBodyReader | public class SpanBasedMqttPacketBodyReader : IMqttPacketBodyReader | ||||
{ | { | ||||
private ReadOnlyMemory<byte> _buffer; | private ReadOnlyMemory<byte> _buffer; | ||||
private int _offset; | private int _offset; | ||||
public void SetBuffer(ReadOnlyMemory<byte> buffer) | public void SetBuffer(ReadOnlyMemory<byte> buffer) | ||||
@@ -17,11 +18,11 @@ namespace MQTTnet.AspNetCore | |||||
_offset = 0; | _offset = 0; | ||||
} | } | ||||
public ulong Length => (ulong)_buffer.Length; | |||||
public int Length => _buffer.Length; | |||||
public bool EndOfStream => _buffer.Length.Equals(_offset); | public bool EndOfStream => _buffer.Length.Equals(_offset); | ||||
public ulong Offset => (ulong)_offset; | |||||
public int Offset => _offset; | |||||
public byte ReadByte() | public byte ReadByte() | ||||
{ | { | ||||
@@ -116,9 +117,9 @@ namespace MQTTnet.AspNetCore | |||||
throw new MqttProtocolViolationException("Boolean values can be 0 or 1 only."); | throw new MqttProtocolViolationException("Boolean values can be 0 or 1 only."); | ||||
} | } | ||||
public void Seek(ulong position) | |||||
public void Seek(int position) | |||||
{ | { | ||||
_offset = (int)position; | |||||
_offset = position; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -31,7 +31,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
private ManagedMqttClientStorageManager _storageManager; | private ManagedMqttClientStorageManager _storageManager; | ||||
private bool _disposed = false; | |||||
private bool _disposed; | |||||
private bool _subscriptionsNotPushed; | private bool _subscriptionsNotPushed; | ||||
public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) | public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger) | ||||
@@ -102,9 +102,8 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
_connectionCancellationToken = new CancellationTokenSource(); | _connectionCancellationToken = new CancellationTokenSource(); | ||||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
_maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); | _maintainConnectionTask = Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); | ||||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||||
_maintainConnectionTask.Forget(_logger); | |||||
_logger.Info("Started"); | _logger.Info("Started"); | ||||
} | } | ||||
@@ -333,20 +332,20 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
} | } | ||||
} | } | ||||
private void PublishQueuedMessages(CancellationToken cancellationToken) | |||||
private async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) | while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected) | ||||
{ | { | ||||
//Peek at the message without dequeueing in order to prevent the | |||||
//possibility of the queue growing beyond the configured cap. | |||||
//Previously, messages could be re-enqueued if there was an | |||||
//exception, and this re-enqueueing did not honor the cap. | |||||
//Furthermore, because re-enqueueing would shuffle the order | |||||
//of the messages, the DropOldestQueuedMessage strategy would | |||||
//be unable to know which message is actually the oldest and would | |||||
//instead drop the first item in the queue. | |||||
// Peek at the message without dequeueing in order to prevent the | |||||
// possibility of the queue growing beyond the configured cap. | |||||
// Previously, messages could be re-enqueued if there was an | |||||
// exception, and this re-enqueueing did not honor the cap. | |||||
// Furthermore, because re-enqueueing would shuffle the order | |||||
// of the messages, the DropOldestQueuedMessage strategy would | |||||
// be unable to know which message is actually the oldest and would | |||||
// instead drop the first item in the queue. | |||||
var message = _messageQueue.PeekAndWait(); | var message = _messageQueue.PeekAndWait(); | ||||
if (message == null) | if (message == null) | ||||
{ | { | ||||
@@ -355,7 +354,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
cancellationToken.ThrowIfCancellationRequested(); | cancellationToken.ThrowIfCancellationRequested(); | ||||
TryPublishQueuedMessage(message); | |||||
await TryPublishQueuedMessageAsync(message).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
@@ -371,23 +370,28 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
} | } | ||||
} | } | ||||
private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message) | |||||
private async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message) | |||||
{ | { | ||||
Exception transmitException = null; | Exception transmitException = null; | ||||
try | try | ||||
{ | { | ||||
_mqttClient.PublishAsync(message.ApplicationMessage).GetAwaiter().GetResult(); | |||||
await _mqttClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false); | |||||
lock (_messageQueue) //lock to avoid conflict with this.PublishAsync | lock (_messageQueue) //lock to avoid conflict with this.PublishAsync | ||||
{ | { | ||||
//While publishing this message, this.PublishAsync could have booted this | |||||
//message off the queue to make room for another (when using a cap | |||||
//with the DropOldestQueuedMessage strategy). If the first item | |||||
//in the queue is equal to this message, then it's safe to remove | |||||
//it from the queue. If not, that means this.PublishAsync has already | |||||
//removed it, in which case we don't want to do anything. | |||||
// While publishing this message, this.PublishAsync could have booted this | |||||
// message off the queue to make room for another (when using a cap | |||||
// with the DropOldestQueuedMessage strategy). If the first item | |||||
// in the queue is equal to this message, then it's safe to remove | |||||
// it from the queue. If not, that means this.PublishAsync has already | |||||
// removed it, in which case we don't want to do anything. | |||||
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); | _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); | ||||
} | } | ||||
_storageManager?.RemoveAsync(message).GetAwaiter().GetResult(); | |||||
if (_storageManager != null) | |||||
{ | |||||
await _storageManager.RemoveAsync(message).ConfigureAwait(false); | |||||
} | |||||
} | } | ||||
catch (MqttCommunicationException exception) | catch (MqttCommunicationException exception) | ||||
{ | { | ||||
@@ -408,6 +412,11 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
{ | { | ||||
_messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); | _messageQueue.RemoveFirst(i => i.Id.Equals(message.Id)); | ||||
} | } | ||||
if (_storageManager != null) | |||||
{ | |||||
await _storageManager.RemoveAsync(message).ConfigureAwait(false); | |||||
} | |||||
} | } | ||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
@@ -417,7 +426,12 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
ApplicationMessageProcessedHandler?.HandleApplicationMessageProcessedAsync(new ApplicationMessageProcessedEventArgs(message, transmitException)).GetAwaiter().GetResult(); | |||||
var eventHandler = ApplicationMessageProcessedHandler; | |||||
if (eventHandler != null) | |||||
{ | |||||
var eventArguments = new ApplicationMessageProcessedEventArgs(message, transmitException); | |||||
await eventHandler.HandleApplicationMessageProcessedAsync(eventArguments).ConfigureAwait(false); | |||||
} | |||||
} | } | ||||
} | } | ||||
@@ -502,7 +516,7 @@ namespace MQTTnet.Extensions.ManagedClient | |||||
var cts = new CancellationTokenSource(); | var cts = new CancellationTokenSource(); | ||||
_publishingCancellationToken = cts; | _publishingCancellationToken = cts; | ||||
Task.Factory.StartNew(() => PublishQueuedMessages(cts.Token), cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); | |||||
Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token).Forget(_logger); | |||||
} | } | ||||
private void StopPublishing() | private void StopPublishing() | ||||
@@ -29,10 +29,10 @@ namespace MQTTnet.Server.Mqtt | |||||
{ "client_id", context.ClientId }, | { "client_id", context.ClientId }, | ||||
{ "username", context.Username }, | { "username", context.Username }, | ||||
{ "password", context.Password }, | { "password", context.Password }, | ||||
{ "raw_password", new Bytes(context.RawPassword) }, | |||||
{ "raw_password", new Bytes(context.RawPassword ?? new byte[0]) }, | |||||
{ "clean_session", context.CleanSession}, | { "clean_session", context.CleanSession}, | ||||
{ "authentication_method", context.AuthenticationMethod}, | { "authentication_method", context.AuthenticationMethod}, | ||||
{ "authentication_data", new Bytes(context.AuthenticationData)}, | |||||
{ "authentication_data", new Bytes(context.AuthenticationData ?? new byte[0]) }, | |||||
{ "result", PythonConvert.Pythonfy(context.ReasonCode) } | { "result", PythonConvert.Pythonfy(context.ReasonCode) } | ||||
}; | }; | ||||
@@ -44,6 +44,8 @@ namespace MQTTnet.Server.Mqtt | |||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
_logger.LogError(exception, "Error while validating client connection."); | _logger.LogError(exception, "Error while validating client connection."); | ||||
context.ReasonCode = MqttConnectReasonCode.UnspecifiedError; | |||||
} | } | ||||
return Task.CompletedTask; | return Task.CompletedTask; | ||||
@@ -66,7 +66,10 @@ namespace MQTTnet.Server.Mqtt | |||||
var adapters = new List<IMqttServerAdapter> | var adapters = new List<IMqttServerAdapter> | ||||
{ | { | ||||
new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()), | |||||
new MqttTcpServerAdapter(mqttFactory.Logger.CreateChildLogger()) | |||||
{ | |||||
TreatSocketOpeningErrorAsWarning = true // Opening other ports than for HTTP is not allows in Azure App Services. | |||||
}, | |||||
_webSocketServerAdapter | _webSocketServerAdapter | ||||
}; | }; | ||||
@@ -2,10 +2,10 @@ | |||||
"Kestrel": { | "Kestrel": { | ||||
"EndPoints": { | "EndPoints": { | ||||
"Http": { | "Http": { | ||||
"Url": "http://localhost:80" | |||||
"Url": "http://*:80" | |||||
}, | }, | ||||
"Https": { | "Https": { | ||||
"Url": "http://localhost:443" | |||||
"Url": "http://*:443" | |||||
} | } | ||||
} | } | ||||
}, | }, | ||||
@@ -4,9 +4,9 @@ namespace MQTTnet.Formatter | |||||
{ | { | ||||
public interface IMqttPacketBodyReader | public interface IMqttPacketBodyReader | ||||
{ | { | ||||
ulong Length { get; } | |||||
int Length { get; } | |||||
ulong Offset { get; } | |||||
int Offset { get; } | |||||
bool EndOfStream { get; } | bool EndOfStream { get; } | ||||
@@ -26,6 +26,6 @@ namespace MQTTnet.Formatter | |||||
bool ReadBoolean(); | bool ReadBoolean(); | ||||
void Seek(ulong position); | |||||
void Seek(int position); | |||||
} | } | ||||
} | } |
@@ -9,52 +9,42 @@ namespace MQTTnet.Formatter | |||||
public class MqttPacketBodyReader : IMqttPacketBodyReader | public class MqttPacketBodyReader : IMqttPacketBodyReader | ||||
{ | { | ||||
private readonly byte[] _buffer; | private readonly byte[] _buffer; | ||||
private readonly ulong _initialOffset; | |||||
private readonly ulong _length; | |||||
private readonly int _initialOffset; | |||||
private readonly int _length; | |||||
public MqttPacketBodyReader(byte[] buffer, int offset, int length) | |||||
: this(buffer, (ulong)offset, (ulong)length) | |||||
{ | |||||
} | |||||
private int _offset; | |||||
public MqttPacketBodyReader(byte[] buffer, ulong offset, ulong length) | |||||
public MqttPacketBodyReader(byte[] buffer, int offset, int length) | |||||
{ | { | ||||
_buffer = buffer; | _buffer = buffer; | ||||
_initialOffset = offset; | _initialOffset = offset; | ||||
Offset = offset; | |||||
_offset = offset; | |||||
_length = length; | _length = length; | ||||
} | } | ||||
public ulong Offset { get; private set; } | |||||
public int Offset => _offset; | |||||
public ulong Length => _length - Offset; | |||||
public int Length => _length - _offset; | |||||
public bool EndOfStream => Offset == _length; | |||||
public bool EndOfStream => _offset == _length; | |||||
public void Seek(ulong position) | |||||
public void Seek(int position) | |||||
{ | { | ||||
Offset = _initialOffset + position; | |||||
} | |||||
public ArraySegment<byte> Read(uint length) | |||||
{ | |||||
ValidateReceiveBuffer(length); | |||||
var buffer = new ArraySegment<byte>(_buffer, (int)Offset, (int)length); | |||||
Offset += length; | |||||
return buffer; | |||||
_offset = _initialOffset + position; | |||||
} | } | ||||
public byte ReadByte() | public byte ReadByte() | ||||
{ | { | ||||
ValidateReceiveBuffer(1); | ValidateReceiveBuffer(1); | ||||
return _buffer[Offset++]; | |||||
} | |||||
return _buffer[_offset++]; | |||||
} | |||||
public bool ReadBoolean() | public bool ReadBoolean() | ||||
{ | { | ||||
ValidateReceiveBuffer(1); | ValidateReceiveBuffer(1); | ||||
var buffer = _buffer[Offset++]; | |||||
var buffer = _buffer[_offset++]; | |||||
if (buffer == 0) | if (buffer == 0) | ||||
{ | { | ||||
@@ -71,15 +61,19 @@ namespace MQTTnet.Formatter | |||||
public byte[] ReadRemainingData() | public byte[] ReadRemainingData() | ||||
{ | { | ||||
return new ArraySegment<byte>(_buffer, (int)Offset, (int)(_length - Offset)).ToArray(); | |||||
var bufferLength = _length - _offset; | |||||
var buffer = new byte[bufferLength]; | |||||
Array.Copy(_buffer, _offset, buffer, 0, bufferLength); | |||||
return buffer; | |||||
} | } | ||||
public ushort ReadTwoByteInteger() | public ushort ReadTwoByteInteger() | ||||
{ | { | ||||
ValidateReceiveBuffer(2); | ValidateReceiveBuffer(2); | ||||
var msb = _buffer[Offset++]; | |||||
var lsb = _buffer[Offset++]; | |||||
var msb = _buffer[_offset++]; | |||||
var lsb = _buffer[_offset++]; | |||||
return (ushort)(msb << 8 | lsb); | return (ushort)(msb << 8 | lsb); | ||||
} | } | ||||
@@ -88,31 +82,14 @@ namespace MQTTnet.Formatter | |||||
{ | { | ||||
ValidateReceiveBuffer(4); | ValidateReceiveBuffer(4); | ||||
var byte0 = _buffer[Offset++]; | |||||
var byte1 = _buffer[Offset++]; | |||||
var byte2 = _buffer[Offset++]; | |||||
var byte3 = _buffer[Offset++]; | |||||
var byte0 = _buffer[_offset++]; | |||||
var byte1 = _buffer[_offset++]; | |||||
var byte2 = _buffer[_offset++]; | |||||
var byte3 = _buffer[_offset++]; | |||||
return (uint)(byte0 << 24 | byte1 << 16 | byte2 << 8 | byte3); | return (uint)(byte0 << 24 | byte1 << 16 | byte2 << 8 | byte3); | ||||
} | } | ||||
public byte[] ReadWithLengthPrefix() | |||||
{ | |||||
return ReadSegmentWithLengthPrefix().ToArray(); | |||||
} | |||||
private ArraySegment<byte> ReadSegmentWithLengthPrefix() | |||||
{ | |||||
var length = ReadTwoByteInteger(); | |||||
ValidateReceiveBuffer(length); | |||||
var result = new ArraySegment<byte>(_buffer, (int)Offset, length); | |||||
Offset += length; | |||||
return result; | |||||
} | |||||
public uint ReadVariableLengthInteger() | public uint ReadVariableLengthInteger() | ||||
{ | { | ||||
var multiplier = 1; | var multiplier = 1; | ||||
@@ -134,13 +111,30 @@ namespace MQTTnet.Formatter | |||||
return value; | return value; | ||||
} | } | ||||
public byte[] ReadWithLengthPrefix() | |||||
{ | |||||
return ReadSegmentWithLengthPrefix().ToArray(); | |||||
} | |||||
private ArraySegment<byte> ReadSegmentWithLengthPrefix() | |||||
{ | |||||
var length = ReadTwoByteInteger(); | |||||
ValidateReceiveBuffer(length); | |||||
var result = new ArraySegment<byte>(_buffer, _offset, length); | |||||
_offset += length; | |||||
return result; | |||||
} | |||||
[MethodImpl(MethodImplOptions.AggressiveInlining)] | [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||||
private void ValidateReceiveBuffer(uint length) | |||||
private void ValidateReceiveBuffer(int length) | |||||
{ | { | ||||
if (_length < Offset + length) | |||||
if (_length < _offset + length) | |||||
{ | { | ||||
throw new ArgumentOutOfRangeException(nameof(_buffer), $"Expected at least {Offset + length} bytes but there are only {_length} bytes"); | |||||
throw new ArgumentOutOfRangeException(nameof(_buffer), $"Expected at least {_offset + length} bytes but there are only {_length} bytes"); | |||||
} | } | ||||
} | } | ||||
@@ -9,8 +9,8 @@ namespace MQTTnet.Formatter.V5 | |||||
public class MqttV500PropertiesReader | public class MqttV500PropertiesReader | ||||
{ | { | ||||
private readonly IMqttPacketBodyReader _body; | private readonly IMqttPacketBodyReader _body; | ||||
private readonly uint _length; | |||||
private readonly ulong _targetOffset; | |||||
private readonly int _length; | |||||
private readonly int _targetOffset; | |||||
public MqttV500PropertiesReader(IMqttPacketBodyReader body) | public MqttV500PropertiesReader(IMqttPacketBodyReader body) | ||||
{ | { | ||||
@@ -18,7 +18,7 @@ namespace MQTTnet.Formatter.V5 | |||||
if (!body.EndOfStream) | if (!body.EndOfStream) | ||||
{ | { | ||||
_length = body.ReadVariableLengthInteger(); | |||||
_length = (int)body.ReadVariableLengthInteger(); | |||||
} | } | ||||
_targetOffset = body.Offset + _length; | _targetOffset = body.Offset + _length; | ||||
@@ -28,6 +28,8 @@ namespace MQTTnet.Implementations | |||||
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } | public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } | ||||
public bool TreatSocketOpeningErrorAsWarning { get; set; } | |||||
public Task StartAsync(IMqttServerOptions options) | public Task StartAsync(IMqttServerOptions options) | ||||
{ | { | ||||
if (_cancellationTokenSource != null) throw new InvalidOperationException("Server is already started."); | if (_cancellationTokenSource != null) throw new InvalidOperationException("Server is already started."); | ||||
@@ -36,7 +38,7 @@ namespace MQTTnet.Implementations | |||||
if (options.DefaultEndpointOptions.IsEnabled) | if (options.DefaultEndpointOptions.IsEnabled) | ||||
{ | { | ||||
RegisterListeners(options.DefaultEndpointOptions, null); | |||||
RegisterListeners(options.DefaultEndpointOptions, null, _cancellationTokenSource.Token); | |||||
} | } | ||||
if (options.TlsEndpointOptions.IsEnabled) | if (options.TlsEndpointOptions.IsEnabled) | ||||
@@ -52,7 +54,7 @@ namespace MQTTnet.Implementations | |||||
throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); | throw new InvalidOperationException("The certificate for TLS encryption must contain the private key."); | ||||
} | } | ||||
RegisterListeners(options.TlsEndpointOptions, tlsCertificate); | |||||
RegisterListeners(options.TlsEndpointOptions, tlsCertificate, _cancellationTokenSource.Token); | |||||
} | } | ||||
return Task.FromResult(0); | return Task.FromResult(0); | ||||
@@ -78,7 +80,7 @@ namespace MQTTnet.Implementations | |||||
_listeners.Clear(); | _listeners.Clear(); | ||||
} | } | ||||
private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate) | |||||
private void RegisterListeners(MqttServerTcpEndpointBaseOptions options, X509Certificate2 tlsCertificate, CancellationToken cancellationToken) | |||||
{ | { | ||||
if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) | if (!options.BoundInterNetworkAddress.Equals(IPAddress.None)) | ||||
{ | { | ||||
@@ -86,14 +88,15 @@ namespace MQTTnet.Implementations | |||||
AddressFamily.InterNetwork, | AddressFamily.InterNetwork, | ||||
options, | options, | ||||
tlsCertificate, | tlsCertificate, | ||||
_cancellationTokenSource.Token, | |||||
_logger) | _logger) | ||||
{ | { | ||||
ClientHandler = OnClientAcceptedAsync | ClientHandler = OnClientAcceptedAsync | ||||
}; | }; | ||||
listenerV4.Start(); | |||||
_listeners.Add(listenerV4); | |||||
if (listenerV4.Start(TreatSocketOpeningErrorAsWarning, cancellationToken)) | |||||
{ | |||||
_listeners.Add(listenerV4); | |||||
} | |||||
} | } | ||||
if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None)) | if (!options.BoundInterNetworkV6Address.Equals(IPAddress.None)) | ||||
@@ -102,14 +105,15 @@ namespace MQTTnet.Implementations | |||||
AddressFamily.InterNetworkV6, | AddressFamily.InterNetworkV6, | ||||
options, | options, | ||||
tlsCertificate, | tlsCertificate, | ||||
_cancellationTokenSource.Token, | |||||
_logger) | _logger) | ||||
{ | { | ||||
ClientHandler = OnClientAcceptedAsync | ClientHandler = OnClientAcceptedAsync | ||||
}; | }; | ||||
listenerV6.Start(); | |||||
_listeners.Add(listenerV6); | |||||
if (listenerV6.Start(TreatSocketOpeningErrorAsWarning, cancellationToken)) | |||||
{ | |||||
_listeners.Add(listenerV6); | |||||
} | |||||
} | } | ||||
} | } | ||||
@@ -10,6 +10,7 @@ using System.Threading.Tasks; | |||||
using MQTTnet.Adapter; | using MQTTnet.Adapter; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Formatter; | using MQTTnet.Formatter; | ||||
using MQTTnet.Internal; | |||||
using MQTTnet.Server; | using MQTTnet.Server; | ||||
namespace MQTTnet.Implementations | namespace MQTTnet.Implementations | ||||
@@ -17,24 +18,23 @@ namespace MQTTnet.Implementations | |||||
public class MqttTcpServerListener : IDisposable | public class MqttTcpServerListener : IDisposable | ||||
{ | { | ||||
private readonly IMqttNetChildLogger _logger; | private readonly IMqttNetChildLogger _logger; | ||||
private readonly CancellationToken _cancellationToken; | |||||
private readonly AddressFamily _addressFamily; | private readonly AddressFamily _addressFamily; | ||||
private readonly MqttServerTcpEndpointBaseOptions _options; | private readonly MqttServerTcpEndpointBaseOptions _options; | ||||
private readonly MqttServerTlsTcpEndpointOptions _tlsOptions; | private readonly MqttServerTlsTcpEndpointOptions _tlsOptions; | ||||
private readonly X509Certificate2 _tlsCertificate; | private readonly X509Certificate2 _tlsCertificate; | ||||
private Socket _socket; | private Socket _socket; | ||||
private IPEndPoint _localEndPoint; | |||||
public MqttTcpServerListener( | public MqttTcpServerListener( | ||||
AddressFamily addressFamily, | AddressFamily addressFamily, | ||||
MqttServerTcpEndpointBaseOptions options, | MqttServerTcpEndpointBaseOptions options, | ||||
X509Certificate2 tlsCertificate, | X509Certificate2 tlsCertificate, | ||||
CancellationToken cancellationToken, | |||||
IMqttNetChildLogger logger) | IMqttNetChildLogger logger) | ||||
{ | { | ||||
_addressFamily = addressFamily; | _addressFamily = addressFamily; | ||||
_options = options; | _options = options; | ||||
_tlsCertificate = tlsCertificate; | _tlsCertificate = tlsCertificate; | ||||
_cancellationToken = cancellationToken; | |||||
_logger = logger.CreateChildLogger(nameof(MqttTcpServerListener)); | _logger = logger.CreateChildLogger(nameof(MqttTcpServerListener)); | ||||
if (_options is MqttServerTlsTcpEndpointOptions tlsOptions) | if (_options is MqttServerTlsTcpEndpointOptions tlsOptions) | ||||
@@ -45,21 +45,38 @@ namespace MQTTnet.Implementations | |||||
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } | public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; } | ||||
public void Start() | |||||
public bool Start(bool treatErrorsAsWarning, CancellationToken cancellationToken) | |||||
{ | { | ||||
var boundIp = _options.BoundInterNetworkAddress; | |||||
if (_addressFamily == AddressFamily.InterNetworkV6) | |||||
try | |||||
{ | { | ||||
boundIp = _options.BoundInterNetworkV6Address; | |||||
} | |||||
var boundIp = _options.BoundInterNetworkAddress; | |||||
if (_addressFamily == AddressFamily.InterNetworkV6) | |||||
{ | |||||
boundIp = _options.BoundInterNetworkV6Address; | |||||
} | |||||
_localEndPoint = new IPEndPoint(boundIp, _options.Port); | |||||
_socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); | |||||
_socket.Bind(new IPEndPoint(boundIp, _options.Port)); | |||||
_logger.Info($"Starting TCP listener for {_localEndPoint} TLS={_tlsCertificate != null}."); | |||||
_logger.Info($"Starting TCP listener for {_socket.LocalEndPoint} TLS={_tlsCertificate != null}."); | |||||
_socket = new Socket(_addressFamily, SocketType.Stream, ProtocolType.Tcp); | |||||
_socket.Bind(_localEndPoint); | |||||
_socket.Listen(_options.ConnectionBacklog); | |||||
_socket.Listen(_options.ConnectionBacklog); | |||||
Task.Run(() => AcceptClientConnectionsAsync(_cancellationToken), _cancellationToken); | |||||
Task.Run(() => AcceptClientConnectionsAsync(cancellationToken), cancellationToken).Forget(_logger); | |||||
return true; | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
if (!treatErrorsAsWarning) | |||||
{ | |||||
throw; | |||||
} | |||||
_logger.Warning(exception,"Error while creating listener socket for local end point '{0}'.", _localEndPoint); | |||||
return false; | |||||
} | |||||
} | } | ||||
public void Dispose() | public void Dispose() | ||||
@@ -82,13 +99,29 @@ namespace MQTTnet.Implementations | |||||
#else | #else | ||||
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); | var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false); | ||||
#endif | #endif | ||||
#pragma warning disable 4014 | |||||
Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken); | |||||
#pragma warning restore 4014 | |||||
if (clientSocket == null) | |||||
{ | |||||
continue; | |||||
} | |||||
Task.Run(() => TryHandleClientConnectionAsync(clientSocket), cancellationToken).Forget(_logger); | |||||
} | |||||
catch (OperationCanceledException) | |||||
{ | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
_logger.Error(exception, $"Error while accepting connection at TCP listener {_socket.LocalEndPoint} TLS={_tlsCertificate != null}."); | |||||
if (exception is SocketException socketException) | |||||
{ | |||||
if (socketException.SocketErrorCode == SocketError.ConnectionAborted || | |||||
socketException.SocketErrorCode == SocketError.OperationAborted) | |||||
{ | |||||
continue; | |||||
} | |||||
} | |||||
_logger.Error(exception, $"Error while accepting connection at TCP listener {_localEndPoint} TLS={_tlsCertificate != null}."); | |||||
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); | ||||
} | } | ||||
} | } | ||||
@@ -105,7 +138,7 @@ namespace MQTTnet.Implementations | |||||
_logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.", | _logger.Verbose("Client '{0}' accepted by TCP listener '{1}, {2}'.", | ||||
remoteEndPoint, | remoteEndPoint, | ||||
_socket.LocalEndPoint, | |||||
_localEndPoint, | |||||
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); | _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); | ||||
clientSocket.NoDelay = _options.NoDelay; | clientSocket.NoDelay = _options.NoDelay; | ||||
@@ -163,7 +196,7 @@ namespace MQTTnet.Implementations | |||||
_logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.", | _logger.Verbose("Client '{0}' disconnected at TCP listener '{1}, {2}'.", | ||||
remoteEndPoint, | remoteEndPoint, | ||||
_socket.LocalEndPoint, | |||||
_localEndPoint, | |||||
_addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); | _addressFamily == AddressFamily.InterNetwork ? "ipv4" : "ipv6"); | ||||
} | } | ||||
catch (Exception disposeException) | catch (Exception disposeException) | ||||
@@ -7,6 +7,7 @@ using MQTTnet.Client; | |||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Exceptions; | using MQTTnet.Exceptions; | ||||
using MQTTnet.Formatter; | using MQTTnet.Formatter; | ||||
using MQTTnet.Internal; | |||||
using MQTTnet.PacketDispatcher; | using MQTTnet.PacketDispatcher; | ||||
using MQTTnet.Packets; | using MQTTnet.Packets; | ||||
using MQTTnet.Protocol; | using MQTTnet.Protocol; | ||||
@@ -134,9 +135,7 @@ namespace MQTTnet.Server | |||||
Session.WillMessage = _connectPacket.WillMessage; | Session.WillMessage = _connectPacket.WillMessage; | ||||
#pragma warning disable 4014 | |||||
Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token); | |||||
#pragma warning restore 4014 | |||||
Task.Run(() => SendPendingPacketsAsync(_cancellationToken.Token), _cancellationToken.Token).Forget(_logger); | |||||
// TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor. | // TODO: Change to single thread in SessionManager. Or use SessionManager and stats from KeepAliveMonitor. | ||||
_keepAliveMonitor.Start(_connectPacket.KeepAlivePeriod, _cancellationToken.Token); | _keepAliveMonitor.Start(_connectPacket.KeepAlivePeriod, _cancellationToken.Token); | ||||
@@ -3,6 +3,7 @@ using System.Diagnostics; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using MQTTnet.Diagnostics; | using MQTTnet.Diagnostics; | ||||
using MQTTnet.Internal; | |||||
namespace MQTTnet.Server | namespace MQTTnet.Server | ||||
{ | { | ||||
@@ -32,7 +33,7 @@ namespace MQTTnet.Server | |||||
return; | return; | ||||
} | } | ||||
Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).ConfigureAwait(false); | |||||
Task.Run(() => RunAsync(keepAlivePeriod, cancellationToken), cancellationToken).Forget(_logger); | |||||
} | } | ||||
public void Pause() | public void Pause() | ||||
@@ -47,7 +47,7 @@ namespace MQTTnet.Server | |||||
public void Start() | public void Start() | ||||
{ | { | ||||
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken); | |||||
Task.Run(() => TryProcessQueuedApplicationMessagesAsync(_cancellationToken), _cancellationToken).Forget(_logger); | |||||
} | } | ||||
public async Task StopAsync() | public async Task StopAsync() | ||||
@@ -57,7 +57,7 @@ namespace MQTTnet.Benchmarks | |||||
var receivedPacket = new ReceivedMqttPacket( | var receivedPacket = new ReceivedMqttPacket( | ||||
header.Flags, | header.Flags, | ||||
new MqttPacketBodyReader(_serializedPacket.Array, (ulong)(_serializedPacket.Count - header.RemainingLength), (ulong)_serializedPacket.Array.Length), 0); | |||||
new MqttPacketBodyReader(_serializedPacket.Array, _serializedPacket.Count - header.RemainingLength, _serializedPacket.Array.Length), 0); | |||||
_serializer.Decode(receivedPacket); | _serializer.Decode(receivedPacket); | ||||
} | } | ||||
@@ -0,0 +1,23 @@ | |||||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||||
using MQTTnet.Formatter; | |||||
namespace MQTTnet.Tests | |||||
{ | |||||
[TestClass] | |||||
public class Protocol_Tests | |||||
{ | |||||
[TestMethod] | |||||
public void Encode_Four_Byte_Integer() | |||||
{ | |||||
for (uint i = 0; i < 268435455; i++) | |||||
{ | |||||
var buffer = MqttPacketWriter.EncodeVariableLengthInteger(i); | |||||
var reader = new MqttPacketBodyReader(buffer.Array, buffer.Offset, buffer.Count); | |||||
var checkValue = reader.ReadVariableLengthInteger(); | |||||
Assert.AreEqual(i, checkValue); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -22,6 +22,27 @@ namespace MQTTnet.Tests | |||||
[TestClass] | [TestClass] | ||||
public class Server_Tests | public class Server_Tests | ||||
{ | { | ||||
[TestMethod] | |||||
public async Task Use_Empty_Client_ID() | |||||
{ | |||||
using (var testEnvironment = new TestEnvironment()) | |||||
{ | |||||
await testEnvironment.StartServerAsync(); | |||||
var client = testEnvironment.CreateClient(); | |||||
var clientOptions = new MqttClientOptionsBuilder() | |||||
.WithTcpServer("localhost", testEnvironment.ServerPort) | |||||
.WithClientId(string.Empty) | |||||
.Build(); | |||||
var connectResult = await client.ConnectAsync(clientOptions); | |||||
Assert.IsFalse(connectResult.IsSessionPresent); | |||||
Assert.IsTrue(client.IsConnected); | |||||
} | |||||
} | |||||
[TestMethod] | [TestMethod] | ||||
public async Task Publish_At_Most_Once_0x00() | public async Task Publish_At_Most_Once_0x00() | ||||
{ | { | ||||