@@ -20,11 +20,11 @@ if ($path) { | |||
&$msbuild ..\Frameworks\MQTTnet.AspNetCore\MQTTnet.AspNetCore.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
# Build the RPC extension | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net452" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="net461" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard1.3" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="netstandard2.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
&$msbuild ..\Extensions\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj /t:Build /p:Configuration="Release" /p:TargetFramework="uap10.0" /p:FileVersion=$assemblyVersion /p:AssemblyVersion=$assemblyVersion /verbosity:m /p:SignAssembly=true /p:AssemblyOriginatorKeyFile=".\..\..\Build\codeSigningKey.pfx" | |||
Remove-Item .\NuGet -Force -Recurse -ErrorAction SilentlyContinue | |||
@@ -4,6 +4,7 @@ using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Client; | |||
using MQTTnet.Exceptions; | |||
using MQTTnet.Protocol; | |||
namespace MQTTnet.Extensions.Rpc | |||
@@ -63,10 +64,11 @@ namespace MQTTnet.Extensions.Rpc | |||
await _mqttClient.SubscribeAsync(responseTopic, qualityOfServiceLevel).ConfigureAwait(false); | |||
await _mqttClient.PublishAsync(requestMessage).ConfigureAwait(false); | |||
using (var timeoutCts = new CancellationTokenSource(timeout)) | |||
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token)) | |||
{ | |||
timeoutCts.Token.Register(() => | |||
linkedCts.Token.Register(() => | |||
{ | |||
if (!tcs.Task.IsCompleted && !tcs.Task.IsFaulted && !tcs.Task.IsCanceled) | |||
{ | |||
@@ -74,9 +76,23 @@ namespace MQTTnet.Extensions.Rpc | |||
} | |||
}); | |||
var result = await tcs.Task.ConfigureAwait(false); | |||
timeoutCts.Cancel(false); | |||
return result; | |||
try | |||
{ | |||
var result = await tcs.Task.ConfigureAwait(false); | |||
timeoutCts.Cancel(false); | |||
return result; | |||
} | |||
catch (TaskCanceledException taskCanceledException) | |||
{ | |||
if (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) | |||
{ | |||
throw new MqttCommunicationTimedOutException(taskCanceledException); | |||
} | |||
else | |||
{ | |||
throw; | |||
} | |||
} | |||
} | |||
} | |||
finally | |||
@@ -3,12 +3,13 @@ using System.Net.WebSockets; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Adapter; | |||
using MQTTnet.Diagnostics; | |||
using MQTTnet.Implementations; | |||
using MQTTnet.Serializer; | |||
using MQTTnet.Server; | |||
namespace MQTTnet.AspNetCore | |||
{ | |||
public sealed class MqttWebSocketServerAdapter : IMqttServerAdapter, IDisposable | |||
public class MqttWebSocketServerAdapter : IMqttServerAdapter | |||
{ | |||
public event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; | |||
@@ -26,8 +27,7 @@ namespace MQTTnet.AspNetCore | |||
{ | |||
if (webSocket == null) throw new ArgumentNullException(nameof(webSocket)); | |||
var channel = new MqttWebSocketServerChannel(webSocket); | |||
var clientAdapter = new MqttChannelAdapter(channel, new MqttPacketSerializer(), new MqttNetLogger()); | |||
var clientAdapter = new MqttChannelAdapter(new MqttWebSocketChannel(webSocket), new MqttPacketSerializer(), new MqttNetLogger()); | |||
var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(clientAdapter); | |||
ClientAccepted?.Invoke(this, eventArgs); | |||
@@ -1,72 +0,0 @@ | |||
using System; | |||
using System.IO; | |||
using System.Net.WebSockets; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Implementations; | |||
namespace MQTTnet.AspNetCore | |||
{ | |||
public class MqttWebSocketServerChannel : IMqttChannel | |||
{ | |||
private WebSocket _webSocket; | |||
private readonly MqttWebSocketChannel _channel; | |||
public MqttWebSocketServerChannel(WebSocket webSocket) | |||
{ | |||
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); | |||
_channel = new MqttWebSocketChannel(webSocket); | |||
ReceiveStream = SendStream; | |||
} | |||
private Stream SendStream { get; set; } | |||
private Stream ReceiveStream { get; set; } | |||
public Task ConnectAsync(CancellationToken cancellationToken) | |||
{ | |||
return Task.CompletedTask; | |||
} | |||
public async Task DisconnectAsync() | |||
{ | |||
if (_webSocket == null) | |||
{ | |||
return; | |||
} | |||
try | |||
{ | |||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); | |||
} | |||
finally | |||
{ | |||
Dispose(); | |||
} | |||
} | |||
public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
{ | |||
return ReceiveStream.ReadAsync(buffer, offset, count, cancellationToken); | |||
} | |||
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
{ | |||
return SendStream.WriteAsync(buffer, offset, count, cancellationToken); | |||
} | |||
public void Dispose() | |||
{ | |||
SendStream?.Dispose(); | |||
ReceiveStream?.Dispose(); | |||
_webSocket?.Dispose(); | |||
SendStream = null; | |||
ReceiveStream = null; | |||
_webSocket = null; | |||
} | |||
} | |||
} |
@@ -4,7 +4,7 @@ using MQTTnet.Server; | |||
namespace MQTTnet.Adapter | |||
{ | |||
public interface IMqttServerAdapter | |||
public interface IMqttServerAdapter : IDisposable | |||
{ | |||
event EventHandler<MqttServerAdapterClientAcceptedEventArgs> ClientAccepted; | |||
@@ -134,7 +134,7 @@ namespace MQTTnet.Adapter | |||
return new ReceivedMqttPacket(header, new MemoryStream(new byte[0], false)); | |||
} | |||
var body = header.BodyLength <= ReadBufferSize ? new MemoryStream(header.BodyLength) : new MemoryStream(); | |||
var body = new MemoryStream(header.BodyLength); | |||
var buffer = new byte[Math.Min(ReadBufferSize, header.BodyLength)]; | |||
while (body.Length < header.BodyLength) | |||
@@ -10,7 +10,7 @@ namespace MQTTnet.Client | |||
{ | |||
public class MqttPacketDispatcher | |||
{ | |||
private readonly ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort?, Type>, TaskCompletionSource<MqttBasePacket>>(); | |||
private readonly ConcurrentDictionary<Tuple<ushort, Type>, TaskCompletionSource<MqttBasePacket>> _awaiters = new ConcurrentDictionary<Tuple<ushort, Type>, TaskCompletionSource<MqttBasePacket>>(); | |||
private readonly IMqttNetLogger _logger; | |||
public MqttPacketDispatcher(IMqttNetLogger logger) | |||
@@ -40,16 +40,15 @@ namespace MQTTnet.Client | |||
{ | |||
if (packet == null) throw new ArgumentNullException(nameof(packet)); | |||
ushort? identifier = 0; | |||
if (packet is IMqttPacketWithIdentifier packetWithIdentifier) | |||
ushort identifier = 0; | |||
if (packet is IMqttPacketWithIdentifier packetWithIdentifier && packetWithIdentifier.PacketIdentifier.HasValue) | |||
{ | |||
identifier = packetWithIdentifier.PacketIdentifier; | |||
identifier = packetWithIdentifier.PacketIdentifier.Value; | |||
} | |||
var type = packet.GetType(); | |||
var key = new Tuple<ushort?, Type>(identifier, type); | |||
var key = new Tuple<ushort, Type>(identifier, type); | |||
if (_awaiters.TryRemove(key, out var tcs)) | |||
{ | |||
tcs.TrySetResult(packet); | |||
@@ -73,7 +72,7 @@ namespace MQTTnet.Client | |||
identifier = 0; | |||
} | |||
var dictionaryKey = new Tuple<ushort?, Type>(identifier, responseType); | |||
var dictionaryKey = new Tuple<ushort, Type>(identifier ?? 0, responseType); | |||
if (!_awaiters.TryAdd(dictionaryKey, tcs)) | |||
{ | |||
throw new InvalidOperationException($"The packet dispatcher already has an awaiter for packet of type '{responseType}' with identifier {identifier}."); | |||
@@ -89,7 +88,7 @@ namespace MQTTnet.Client | |||
identifier = 0; | |||
} | |||
var dictionaryKey = new Tuple<ushort?, Type>(identifier, responseType); | |||
var dictionaryKey = new Tuple<ushort, Type>(identifier ?? 0, responseType); | |||
_awaiters.TryRemove(dictionaryKey, out var _); | |||
} | |||
} |
@@ -9,7 +9,7 @@ using MQTTnet.Server; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable | |||
public class MqttTcpServerAdapter : IMqttServerAdapter | |||
{ | |||
private readonly IMqttNetLogger _logger; | |||
private StreamSocketListener _defaultEndpointSocket; | |||
@@ -14,7 +14,7 @@ using MQTTnet.Server; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public class MqttTcpServerAdapter : IMqttServerAdapter, IDisposable | |||
public class MqttTcpServerAdapter : IMqttServerAdapter | |||
{ | |||
private readonly IMqttNetLogger _logger; | |||
@@ -5,25 +5,18 @@ using System.Threading; | |||
using System.Threading.Tasks; | |||
using MQTTnet.Channel; | |||
using MQTTnet.Client; | |||
using MQTTnet.Exceptions; | |||
namespace MQTTnet.Implementations | |||
{ | |||
public sealed class MqttWebSocketChannel : IMqttChannel | |||
{ | |||
private readonly MqttClientWebSocketOptions _options; | |||
private readonly byte[] _chunkBuffer; | |||
private int _chunkBufferLength; | |||
private int _chunkBufferOffset; | |||
private WebSocket _webSocket; | |||
public MqttWebSocketChannel(MqttClientWebSocketOptions options) | |||
{ | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_chunkBuffer = new byte[options.BufferSize]; | |||
} | |||
public MqttWebSocketChannel(WebSocket webSocket) | |||
@@ -99,32 +92,8 @@ namespace MQTTnet.Implementations | |||
public async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
{ | |||
var bytesRead = 0; | |||
while (_webSocket.State == WebSocketState.Open) | |||
{ | |||
await EnsureFilledChunkBufferAsync(cancellationToken).ConfigureAwait(false); | |||
if (_chunkBufferLength == 0) | |||
{ | |||
return 0; | |||
} | |||
while (count > 0 && _chunkBufferOffset < _chunkBufferLength) | |||
{ | |||
buffer[offset] = _chunkBuffer[_chunkBufferOffset]; | |||
_chunkBufferOffset++; | |||
count--; | |||
bytesRead++; | |||
offset++; | |||
} | |||
if (count == 0) | |||
{ | |||
return bytesRead; | |||
} | |||
} | |||
return bytesRead; | |||
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken).ConfigureAwait(false); | |||
return response.Count; | |||
} | |||
public Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | |||
@@ -146,33 +115,5 @@ namespace MQTTnet.Implementations | |||
_webSocket = null; | |||
} | |||
} | |||
private async Task EnsureFilledChunkBufferAsync(CancellationToken cancellationToken) | |||
{ | |||
if (_chunkBufferOffset < _chunkBufferLength) | |||
{ | |||
return; | |||
} | |||
if (_webSocket.State == WebSocketState.Closed) | |||
{ | |||
throw new MqttCommunicationException("WebSocket connection closed."); | |||
} | |||
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false); | |||
_chunkBufferLength = response.Count; | |||
_chunkBufferOffset = 0; | |||
if (response.MessageType == WebSocketMessageType.Close) | |||
{ | |||
throw new MqttCommunicationException("The WebSocket server closed the connection."); | |||
//await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); | |||
} | |||
if (response.MessageType == WebSocketMessageType.Text) | |||
{ | |||
throw new MqttProtocolViolationException("WebSocket channel received TEXT message."); | |||
} | |||
} | |||
} | |||
} |
@@ -71,7 +71,7 @@ namespace MQTTnet.ManagedClient | |||
_connectionCancellationToken = new CancellationTokenSource(); | |||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
Task.Run(async () => await MaintainConnectionAsync(_connectionCancellationToken.Token).ConfigureAwait(false), _connectionCancellationToken.Token).ConfigureAwait(false); | |||
Task.Run(() => MaintainConnectionAsync(_connectionCancellationToken.Token), _connectionCancellationToken.Token); | |||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
_logger.Info<ManagedMqttClient>("Started"); | |||
@@ -190,10 +190,7 @@ namespace MQTTnet.ManagedClient | |||
if (connectionState == ReconnectionResult.Reconnected || _subscriptionsNotPushed) | |||
{ | |||
await SynchronizeSubscriptionsAsync().ConfigureAwait(false); | |||
StartPublishing(); | |||
return; | |||
} | |||
@@ -375,7 +372,7 @@ namespace MQTTnet.ManagedClient | |||
_publishingCancellationToken = cts; | |||
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
Task.Run(async () => await PublishQueuedMessagesAsync(cts.Token).ConfigureAwait(false), cts.Token).ConfigureAwait(false); | |||
Task.Run(() => PublishQueuedMessagesAsync(cts.Token), cts.Token); | |||
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |||
} | |||
@@ -71,5 +71,9 @@ namespace MQTTnet.Core.Tests | |||
{ | |||
return Task.FromResult(0); | |||
} | |||
public void Dispose() | |||
{ | |||
} | |||
} | |||
} |
@@ -354,13 +354,17 @@ namespace MQTTnet.TestApp.UniversalWindows | |||
{ | |||
var rpcClient = new MqttRpcClient(_mqttClient); | |||
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), RpcMethod.Text, payload, qos); | |||
RpcResponses.Items.Add(RpcMethod.Text + " >>> " + Encoding.UTF8.GetString(response)); | |||
} | |||
catch (MqttCommunicationTimedOutException) | |||
{ | |||
RpcResponses.Items.Add(RpcMethod.Text + " >>> [TIMEOUT]"); | |||
} | |||
catch (Exception exception) | |||
{ | |||
RpcResponses.Items.Add(RpcMethod.Text + " >>> [EXCEPTION (" + exception.Message + ")]"); | |||
} | |||
} | |||
private void ClearRpcResponses(object sender, RoutedEventArgs e) | |||