diff --git a/Build/build.ps1 b/Build/build.ps1 index 021ca1d..021858f 100644 --- a/Build/build.ps1 +++ b/Build/build.ps1 @@ -20,11 +20,11 @@ if ($path) { &$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" # Build the RPC extension - &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m - &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m + &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" + &$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue diff --git a/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index b8cfd47..e824279 100644 --- a/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Extensions/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -4,6 +4,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client; +using MQTTnet.Exceptions; using MQTTnet.Protocol; namespace MQTTnet.Extensions.Rpc @@ -63,10 +64,11 @@ namespace MQTTnet.Extensions.Rpc await _mqttClient.SubscribeAsync(responseTopic, qualityOfServiceLevel).ConfigureAwait(false); await _mqttClient.PublishAsync(requestMessage).ConfigureAwait(false); - + using (var timeoutCts = new CancellationTokenSource(timeout)) + using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token)) { - timeoutCts.Token.Register(() => + linkedCts.Token.Register(() => { if (!tcs.Task.IsCompleted && !tcs.Task.IsFaulted && !tcs.Task.IsCanceled) { @@ -74,9 +76,23 @@ namespace MQTTnet.Extensions.Rpc } }); - var result = await tcs.Task.ConfigureAwait(false); - timeoutCts.Cancel(false); - return result; + try + { + var result = await tcs.Task.ConfigureAwait(false); + timeoutCts.Cancel(false); + return result; + } + catch (TaskCanceledException taskCanceledException) + { + if (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + { + throw new MqttCommunicationTimedOutException(taskCanceledException); + } + else + { + throw; + } + } } } finally diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index c84b9be..babc59b 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -3,12 +3,13 @@ using System.Net.WebSockets; using System.Threading.Tasks; using MQTTnet.Adapter; using MQTTnet.Diagnostics; +using MQTTnet.Implementations; using MQTTnet.Serializer; using MQTTnet.Server; namespace MQTTnet.AspNetCore { - public sealed class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable + public class MqttWebSocketServerAdapter : IMqttServerAdapter { public event EventHandler ClientAccepted; @@ -26,8 +27,7 @@ namespace MQTTnet.AspNetCore { if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); - var channel = new MqttWebSocketServerChannel(webSocket); - var clientAdapter = new MqttChannelAdapter(channel, new MqttPacketSerializer(), new MqttNetLogger()); + var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket), new MqttPacketSerializer(), new MqttNetLogger()); var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); ClientAccepted?.Invoke(this, eventArgs); diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs deleted file mode 100644 index 82bd915..0000000 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.IO; -using System.Net.WebSockets; -using System.Threading; -using System.Threading.Tasks; -using MQTTnet.Channel; -using MQTTnet.Implementations; - -namespace MQTTnet.AspNetCore -{ - public class MqttWebSocketServerChannel : IMqttChannel - { - private WebSocket _webSocket; - private readonly MqttWebSocketChannel _channel; - - public MqttWebSocketServerChannel(WebSocket webSocket) - { - _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); - - - _channel = new MqttWebSocketChannel(webSocket); - ReceiveStream = SendStream; - } - - private Stream SendStream { get; set; } - private Stream ReceiveStream { get; set; } - - public Task ConnectAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - public async Task DisconnectAsync() - { - if (_webSocket == null) - { - return; - } - - try - { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); - } - finally - { - Dispose(); - } - } - - public Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return ReceiveStream.ReadAsync(buffer, offset, count, cancellationToken); - } - - public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return SendStream.WriteAsync(buffer, offset, count, cancellationToken); - } - - public void Dispose() - { - SendStream?.Dispose(); - ReceiveStream?.Dispose(); - - _webSocket?.Dispose(); - - SendStream = null; - ReceiveStream = null; - _webSocket = null; - } - } -} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/IMqttServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/IMqttServerAdapter.cs index bd583f5..eff9eab 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/IMqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/IMqttServerAdapter.cs @@ -4,7 +4,7 @@ using MQTTnet.Server; namespace MQTTnet.Adapter { - public interface IMqttServerAdapter + public interface IMqttServerAdapter : IDisposable { event EventHandler ClientAccepted; diff --git a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs index 79192c4..8fce65e 100644 --- a/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs @@ -134,7 +134,7 @@ namespace MQTTnet.Adapter return new ReceivedMqttPacket(header, new MemoryStream(new byte[0], false)); } - var body = header.BodyLength <= ReadBufferSize ? new MemoryStream(header.BodyLength) : new MemoryStream(); + var body = new MemoryStream(header.BodyLength); var buffer = new byte[Math.Min(ReadBufferSize, header.BodyLength)]; while (body.Length < header.BodyLength) diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs index 5035a1e..4569d12 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttPacketDispatcher.cs @@ -10,7 +10,7 @@ namespace MQTTnet.Client { public class MqttPacketDispatcher { - private readonly ConcurrentDictionary, TaskCompletionSource> _awaiters = new ConcurrentDictionary, TaskCompletionSource>(); + private readonly ConcurrentDictionary, TaskCompletionSource> _awaiters = new ConcurrentDictionary, TaskCompletionSource>(); private readonly IMqttNetLogger _logger; public MqttPacketDispatcher(IMqttNetLogger logger) @@ -40,16 +40,15 @@ namespace MQTTnet.Client { if (packet == null) throw new ArgumentNullException(nameof(packet)); - ushort? identifier = 0; - if (packet is IMqttPacketWithIdentifier packetWithIdentifier) + ushort identifier = 0; + if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue) { - identifier = packetWithIdentifier.PacketIdentifier; + identifier = packetWithIdentifier.PacketIdentifier.Value; } var type = packet.GetType(); - var key = new Tuple(identifier, type); - - + var key = new Tuple(identifier, type); + if (_awaiters.TryRemove(key, out var tcs)) { tcs.TrySetResult(packet); @@ -73,7 +72,7 @@ namespace MQTTnet.Client identifier = 0; } - var dictionaryKey = new Tuple(identifier, responseType); + var dictionaryKey = new Tuple(identifier ?? 0, responseType); if (!_awaiters.TryAdd(dictionaryKey, tcs)) { throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); @@ -89,7 +88,7 @@ namespace MQTTnet.Client identifier = 0; } - var dictionaryKey = new Tuple(identifier, responseType); + var dictionaryKey = new Tuple(identifier ?? 0, responseType); _awaiters.TryRemove(dictionaryKey, out var _); } } diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs index 0d84ab4..c6b0ff4 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.Uwp.cs @@ -9,7 +9,7 @@ using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable + public class MqttTcpServerAdapter : IMqttServerAdapter { private readonly IMqttNetLogger _logger; private StreamSocketListener _defaultEndpointSocket; diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs index c9f38ab..56457b5 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpServerAdapter.cs @@ -14,7 +14,7 @@ using MQTTnet.Server; namespace MQTTnet.Implementations { - public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable + public class MqttTcpServerAdapter : IMqttServerAdapter { private readonly IMqttNetLogger _logger; diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index 9688ced..74d2685 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -5,25 +5,18 @@ using System.Threading; using System.Threading.Tasks; using MQTTnet.Channel; using MQTTnet.Client; -using MQTTnet.Exceptions; namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttChannel { private readonly MqttClientWebSocketOptions _options; - private readonly byte[] _chunkBuffer; - - private int _chunkBufferLength; - private int _chunkBufferOffset; private WebSocket _webSocket; public MqttWebSocketChannel(MqttClientWebSocketOptions options) { _options = options ?? throw new ArgumentNullException(nameof(options)); - - _chunkBuffer = new byte[options.BufferSize]; } public MqttWebSocketChannel(WebSocket webSocket) @@ -99,32 +92,8 @@ namespace MQTTnet.Implementations public async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var bytesRead = 0; - - while (_webSocket.State == WebSocketState.Open) - { - await EnsureFilledChunkBufferAsync(cancellationToken).ConfigureAwait(false); - if (_chunkBufferLength == 0) - { - return 0; - } - - while (count > 0 && _chunkBufferOffset < _chunkBufferLength) - { - buffer[offset] = _chunkBuffer[_chunkBufferOffset]; - _chunkBufferOffset++; - count--; - bytesRead++; - offset++; - } - - if (count == 0) - { - return bytesRead; - } - } - - return bytesRead; + var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, offset, count), cancellationToken).ConfigureAwait(false); + return response.Count; } public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -146,33 +115,5 @@ namespace MQTTnet.Implementations _webSocket = null; } } - - private async Task EnsureFilledChunkBufferAsync(CancellationToken cancellationToken) - { - if (_chunkBufferOffset < _chunkBufferLength) - { - return; - } - - if (_webSocket.State == WebSocketState.Closed) - { - throw new MqttCommunicationException("WebSocket connection closed."); - } - - var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false); - _chunkBufferLength = response.Count; - _chunkBufferOffset = 0; - - if (response.MessageType == WebSocketMessageType.Close) - { - throw new MqttCommunicationException("The WebSocket server closed the connection."); - //await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); - } - - if (response.MessageType == WebSocketMessageType.Text) - { - throw new MqttProtocolViolationException("WebSocket channel received TEXT message."); - } - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs index f1ce45d..bbfc1c7 100644 --- a/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs @@ -71,7 +71,7 @@ namespace MQTTnet.ManagedClient _connectionCancellationToken = new CancellationTokenSource(); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false); + Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _logger.Info("Started"); @@ -190,10 +190,7 @@ namespace MQTTnet.ManagedClient if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) { await SynchronizeSubscriptionsAsync().ConfigureAwait(false); - StartPublishing(); - - return; } @@ -375,7 +372,7 @@ namespace MQTTnet.ManagedClient _publishingCancellationToken = cts; #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false); + Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } diff --git a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs index a2fec80..8b9d0ba 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttServerAdapter.cs @@ -71,5 +71,9 @@ namespace MQTTnet.Core.Tests { return Task.FromResult(0); } + + public void Dispose() + { + } } } \ No newline at end of file diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index d907f1d..3de538a 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -354,13 +354,17 @@ namespace MQTTnet.TestApp.UniversalWindows { var rpcClient = new MqttRpcClient(_mqttClient); var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos); - + RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response)); } catch (MqttCommunicationTimedOutException) { RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]"); } + catch (Exception exception) + { + RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]"); + } } private void ClearRpcResponses(object sender, RoutedEventArgs e)