diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c260e54 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +language: csharp +solution: MQTTnet.sln + +matrix: + include: + - dotnet: 2.0.0 + mono: none + dist: trusty + env: DOTNETCORE=1 # optional, can be used to take different code paths in your script + - mono: latest \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs index 92e071e..faa5eff 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttServerAdapter.cs @@ -85,7 +85,7 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); + var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); } @@ -105,10 +105,10 @@ namespace MQTTnet.Implementations { try { - var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null); + var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false); var sslStream = new SslStream(new NetworkStream(clientSocket)); - await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); + await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); diff --git a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs index 54a4ce6..32b6978 100644 --- a/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetFramework/Implementations/MqttTcpChannel.cs @@ -46,13 +46,13 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false); if (options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); _dataStream = _sslStream; - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); + await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } else { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs index e7820a3..8486ca1 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs @@ -45,13 +45,13 @@ namespace MQTTnet.Implementations _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); } - await _socket.ConnectAsync(options.Server, options.GetPort()); + await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false); if (options.TlsOptions.UseTls) { _sslStream = new SslStream(new NetworkStream(_socket, true)); _dataStream = _sslStream; - await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); + await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); } else { diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs new file mode 100644 index 0000000..8d51812 --- /dev/null +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketsChannel.cs @@ -0,0 +1,119 @@ +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.Implementations +{ + public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable + { + private ClientWebSocket _webSocket; + private const int BufferSize = 4096; + private const int BufferAmplifier = 20; + private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; + private int WebSocketBufferSize; + private int WebSocketBufferOffset; + + public MqttWebSocketsChannel() + { + _webSocket = new ClientWebSocket(); + } + + public async Task ConnectAsync(MqttClientOptions options) + { + _webSocket = null; + + try + { + _webSocket = new ClientWebSocket(); + + await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task DisconnectAsync() + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + public void Dispose() + { + if (_webSocket != null) + { + _webSocket.Dispose(); + } + } + + public Task ReadAsync(byte[] buffer) + { + return Task.WhenAll(ReadToBufferAsync(buffer)); + } + + private async Task ReadToBufferAsync(byte[] buffer) + { + var temporaryBuffer = new byte[BufferSize]; + var offset = 0; + + while (_webSocket.State == WebSocketState.Open) + { + if (WebSocketBufferSize == 0) + { + WebSocketBufferOffset = 0; + + WebSocketReceiveResult response; + do + { + response = + await _webSocket.ReceiveAsync(new ArraySegment(temporaryBuffer), CancellationToken.None); + + temporaryBuffer.CopyTo(WebSocketBuffer, offset); + offset += response.Count; + temporaryBuffer = new byte[BufferSize]; + } while (!response.EndOfMessage); + + WebSocketBufferSize = response.Count; + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + + Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + else + { + Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); + WebSocketBufferSize -= buffer.Length; + WebSocketBufferOffset += buffer.Length; + } + + return; + } + } + + public Task WriteAsync(byte[] buffer) + { + if (buffer == null) { + throw new ArgumentNullException(nameof(buffer)); + } + + try + { + return _webSocket.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, true, + CancellationToken.None); + } + catch (WebSocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj index 4043fd1..de3a289 100644 --- a/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj +++ b/Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj @@ -24,6 +24,8 @@ + + diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs index f47c494..1cd6d6f 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs @@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter; using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; using MQTTnet.Implementations; +using MQTTnet.Core.Channel; namespace MQTTnet { @@ -10,9 +11,27 @@ namespace MQTTnet { public IMqttClient CreateMqttClient(MqttClientOptions options) { - if (options == null) throw new ArgumentNullException(nameof(options)); + if (options == null) { + throw new ArgumentNullException(nameof(options)); + } - return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); + } + + private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) + { + switch (options.ConnectionType) + { + case ConnectionTypes.TCP: + case ConnectionTypes.TLS: + return new MqttTcpChannel(); + case ConnectionTypes.WS: + case ConnectionTypes.WSS: + return new MqttWebSocketsChannel(); + + default: + return null; + } } } } diff --git a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index 04b5ca7..fcea65a 100644 --- a/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -23,19 +23,19 @@ namespace MQTTnet.Core.Adapter public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); + await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout).ConfigureAwait(false); } public async Task DisconnectAsync() { - await _channel.DisconnectAsync(); + await _channel.DisconnectAsync().ConfigureAwait(false); } public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) { MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); - await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); + await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout).ConfigureAwait(false); } public async Task ReceivePacketAsync(TimeSpan timeout) @@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter MqttBasePacket packet; if (timeout > TimeSpan.Zero) { - packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); + packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false); } else { - packet = await PacketSerializer.DeserializeAsync(_channel); + packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false); } if (packet == null) @@ -62,7 +62,7 @@ namespace MQTTnet.Core.Adapter private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } @@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) { var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(timeoutTask, task) == timeoutTask) + if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) { throw new MqttCommunicationTimedOutException(); } diff --git a/MQTTnet.Core/Client/ConnectionType.cs b/MQTTnet.Core/Client/ConnectionType.cs new file mode 100644 index 0000000..1c6abad --- /dev/null +++ b/MQTTnet.Core/Client/ConnectionType.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace MQTTnet.Core.Client +{ + public enum ConnectionTypes + { + TCP, + TLS, + WS, + WSS + } +} diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 3481cbe..efd794d 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client { _disconnectedEventSuspended = false; - await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); @@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client StartReceivePackets(); - var response = await SendAndReceiveAsync(connectPacket); + var response = await SendAndReceiveAsync(connectPacket).ConfigureAwait(false); if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); throw new MqttConnectingFailedException(response.ConnectReturnCode); } @@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client } catch (Exception) { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); throw; } } @@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client { try { - await SendAsync(new MqttDisconnectPacket()); + await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); } finally { - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } } @@ -158,7 +158,7 @@ namespace MQTTnet.Core.Client TopicFilters = topicFilters }; - await SendAndReceiveAsync(unsubscribePacket); + await SendAndReceiveAsync(unsubscribePacket).ConfigureAwait(false); } public async Task PublishAsync(MqttApplicationMessage applicationMessage) @@ -171,18 +171,18 @@ namespace MQTTnet.Core.Client if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await SendAsync(publishPacket); + await SendAsync(publishPacket).ConfigureAwait(false); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket); + await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()); + var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); } } @@ -195,7 +195,7 @@ namespace MQTTnet.Core.Client { try { - await _adapter.DisconnectAsync(); + await _adapter.DisconnectAsync().ConfigureAwait(false); } catch (Exception exception) { @@ -301,7 +301,7 @@ namespace MQTTnet.Core.Client _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); } - await SendAsync(pubRelPacket.CreateResponse()); + await SendAsync(pubRelPacket.CreateResponse()).ConfigureAwait(false); } private Task SendAsync(MqttBasePacket packet) @@ -313,16 +313,12 @@ namespace MQTTnet.Core.Client { bool ResponsePacketSelector(MqttBasePacket p) { - var p1 = p as TResponsePacket; - if (p1 == null) + if (!(p is TResponsePacket p1)) { return false; } - var pi1 = requestPacket as IMqttPacketWithIdentifier; - var pi2 = p as IMqttPacketWithIdentifier; - - if (pi1 == null || pi2 == null) + if (!(requestPacket is IMqttPacketWithIdentifier pi1) || !(p is IMqttPacketWithIdentifier pi2)) { return true; } @@ -330,8 +326,8 @@ namespace MQTTnet.Core.Client return pi1.PacketIdentifier == pi2.PacketIdentifier; } - await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout); + await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false); } private ushort GetNewPacketIdentifier() @@ -347,19 +343,19 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - await Task.Delay(_options.KeepAlivePeriod, cancellationToken); - await SendAndReceiveAsync(new MqttPingReqPacket()); + await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); + await SendAndReceiveAsync(new MqttPingReqPacket()).ConfigureAwait(false); } } catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } finally { @@ -374,7 +370,7 @@ namespace MQTTnet.Core.Client { while (!cancellationToken.IsCancellationRequested) { - var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false); MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); StartProcessReceivedPacket(packet, cancellationToken); @@ -383,12 +379,12 @@ namespace MQTTnet.Core.Client catch (MqttCommunicationException exception) { MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } catch (Exception exception) { MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); - await DisconnectInternalAsync(); + await DisconnectInternalAsync().ConfigureAwait(false); } finally { diff --git a/MQTTnet.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs index f9b75fa..8b65299 100644 --- a/MQTTnet.Core/Client/MqttClientOptions.cs +++ b/MQTTnet.Core/Client/MqttClientOptions.cs @@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; + + public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; } } diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 9abea6d..349c137 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -29,7 +29,7 @@ namespace MQTTnet.Core.Serializer public async Task ReadToEndAsync() { await ReadFixedHeaderAsync(); - await ReadRemainingLengthAsync(); + await ReadRemainingLengthAsync().ConfigureAwait(false); if (_remainingLength == 0) { @@ -37,7 +37,7 @@ namespace MQTTnet.Core.Serializer } var buffer = new byte[_remainingLength]; - await ReadFromSourceAsync(buffer); + await ReadFromSourceAsync(buffer).ConfigureAwait(false); _remainingData.Write(buffer, 0, buffer.Length); _remainingData.Position = 0; @@ -45,12 +45,12 @@ namespace MQTTnet.Core.Serializer public async Task ReadRemainingDataByteAsync() { - return (await ReadRemainingDataAsync(1))[0]; + return (await ReadRemainingDataAsync(1).ConfigureAwait(false))[0]; } public async Task ReadRemainingDataUShortAsync() { - var buffer = await ReadRemainingDataAsync(2); + var buffer = await ReadRemainingDataAsync(2).ConfigureAwait(false); var temp = buffer[0]; buffer[0] = buffer[1]; @@ -68,7 +68,7 @@ namespace MQTTnet.Core.Serializer public async Task ReadRemainingDataWithLengthPrefixAsync() { var length = await ReadRemainingDataUShortAsync(); - return await ReadRemainingDataAsync(length); + return await ReadRemainingDataAsync(length).ConfigureAwait(false); } public Task ReadRemainingDataAsync() @@ -79,7 +79,7 @@ namespace MQTTnet.Core.Serializer public async Task ReadRemainingDataAsync(int length) { var buffer = new byte[length]; - await _remainingData.ReadAsync(buffer, 0, buffer.Length); + await _remainingData.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); return buffer; } @@ -92,7 +92,7 @@ namespace MQTTnet.Core.Serializer byte encodedByte; do { - encodedByte = await ReadStreamByteAsync(); + encodedByte = await ReadStreamByteAsync().ConfigureAwait(false); value += (encodedByte & 127) * multiplier; multiplier *= 128; if (multiplier > 128 * 128 * 128) @@ -119,13 +119,13 @@ namespace MQTTnet.Core.Serializer private async Task ReadStreamByteAsync() { var buffer = new byte[1]; - await ReadFromSourceAsync(buffer); + await ReadFromSourceAsync(buffer).ConfigureAwait(false); return buffer[0]; } private async Task ReadFixedHeaderAsync() { - FixedHeader = await ReadStreamByteAsync(); + FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false); var byteReader = new ByteReader(FixedHeader); byteReader.Read(4); diff --git a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs index f9ec70c..7d6bb81 100644 --- a/MQTTnet.Core/Serializer/MqttPacketSerializer.cs +++ b/MQTTnet.Core/Serializer/MqttPacketSerializer.cs @@ -100,18 +100,18 @@ namespace MQTTnet.Core.Serializer using (var mqttPacketReader = new MqttPacketReader(source)) { - await mqttPacketReader.ReadToEndAsync(); + await mqttPacketReader.ReadToEndAsync().ConfigureAwait(false); switch (mqttPacketReader.ControlPacketType) { case MqttControlPacketType.Connect: { - return await DeserializeConnectAsync(mqttPacketReader); + return await DeserializeConnectAsync(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.ConnAck: { - return await DeserializeConnAck(mqttPacketReader); + return await DeserializeConnAck(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.Disconnect: @@ -121,14 +121,14 @@ namespace MQTTnet.Core.Serializer case MqttControlPacketType.Publish: { - return await DeserializePublishAsync(mqttPacketReader); + return await DeserializePublishAsync(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.PubAck: { return new MqttPubAckPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; } @@ -136,7 +136,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubRecPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; } @@ -144,7 +144,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubRelPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; } @@ -152,7 +152,7 @@ namespace MQTTnet.Core.Serializer { return new MqttPubCompPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; } @@ -168,24 +168,24 @@ namespace MQTTnet.Core.Serializer case MqttControlPacketType.Subscribe: { - return await DeserializeSubscribeAsync(mqttPacketReader); + return await DeserializeSubscribeAsync(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.SubAck: { - return await DeserializeSubAck(mqttPacketReader); + return await DeserializeSubAck(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.Unsubscibe: { - return await DeserializeUnsubscribeAsync(mqttPacketReader); + return await DeserializeUnsubscribeAsync(mqttPacketReader).ConfigureAwait(false); } case MqttControlPacketType.UnsubAck: { return new MqttUnsubAckPacket { - PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() + PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; } @@ -201,12 +201,12 @@ namespace MQTTnet.Core.Serializer { var packet = new MqttUnsubscribePacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), + PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), }; while (!reader.EndOfRemainingData) { - packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync()); + packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false)); } return packet; @@ -216,14 +216,14 @@ namespace MQTTnet.Core.Serializer { var packet = new MqttSubscribePacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), + PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), }; while (!reader.EndOfRemainingData) { packet.TopicFilters.Add(new TopicFilter( await reader.ReadRemainingDataStringWithLengthPrefixAsync(), - (MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync())); + (MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false))); } return packet; @@ -236,12 +236,12 @@ namespace MQTTnet.Core.Serializer var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); var dup = fixedHeader.Read(); - var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); ushort packetIdentifier = 0; if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) { - packetIdentifier = await reader.ReadRemainingDataUShortAsync(); + packetIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); } var packet = new MqttPublishPacket @@ -250,7 +250,7 @@ namespace MQTTnet.Core.Serializer QualityOfServiceLevel = qualityOfServiceLevel, Dup = dup, Topic = topic, - Payload = await reader.ReadRemainingDataAsync(), + Payload = await reader.ReadRemainingDataAsync().ConfigureAwait(false), PacketIdentifier = packetIdentifier }; @@ -259,13 +259,13 @@ namespace MQTTnet.Core.Serializer private static async Task DeserializeConnectAsync(MqttPacketReader reader) { - await reader.ReadRemainingDataAsync(2); // Skip 2 bytes + await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); // Skip 2 bytes MqttProtocolVersion protocolVersion; - var protocolName = await reader.ReadRemainingDataAsync(4); + var protocolName = await reader.ReadRemainingDataAsync(4).ConfigureAwait(false); if (protocolName.SequenceEqual(ProtocolVersionV310Name)) { - await reader.ReadRemainingDataAsync(2); + await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); protocolVersion = MqttProtocolVersion.V310; } else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) @@ -277,8 +277,8 @@ namespace MQTTnet.Core.Serializer throw new MqttProtocolViolationException("Protocol name is not supported."); } - var protocolLevel = await reader.ReadRemainingDataByteAsync(); - var connectFlags = await reader.ReadRemainingDataByteAsync(); + var protocolLevel = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); + var connectFlags = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); var connectFlagsReader = new ByteReader(connectFlags); connectFlagsReader.Read(); // Reserved. @@ -295,26 +295,26 @@ namespace MQTTnet.Core.Serializer var passwordFlag = connectFlagsReader.Read(); var usernameFlag = connectFlagsReader.Read(); - packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync(); - packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); + packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); if (willFlag) { packet.WillMessage = new MqttApplicationMessage( - await reader.ReadRemainingDataStringWithLengthPrefixAsync(), - await reader.ReadRemainingDataWithLengthPrefixAsync(), + await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false), + await reader.ReadRemainingDataWithLengthPrefixAsync().ConfigureAwait(false), (MqttQualityOfServiceLevel)willQoS, willRetain); } if (usernameFlag) { - packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); } if (passwordFlag) { - packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); + packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); } ValidateConnectPacket(packet); @@ -325,12 +325,12 @@ namespace MQTTnet.Core.Serializer { var packet = new MqttSubAckPacket { - PacketIdentifier = await reader.ReadRemainingDataUShortAsync() + PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false) }; while (!reader.EndOfRemainingData) { - packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync()); + packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false)); } return packet; @@ -338,8 +338,8 @@ namespace MQTTnet.Core.Serializer private static async Task DeserializeConnAck(MqttPacketReader reader) { - var variableHeader1 = await reader.ReadRemainingDataByteAsync(); - var variableHeader2 = await reader.ReadRemainingDataByteAsync(); + var variableHeader1 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); + var variableHeader2 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); var packet = new MqttConnAckPacket { @@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); - await output.WriteToAsync(destination); + await output.WriteToAsync(destination).ConfigureAwait(false); } } diff --git a/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj new file mode 100644 index 0000000..7b69707 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/MQTTnet.TestApp.NetCore.csproj @@ -0,0 +1,13 @@ + + + + Exe + netcoreapp2.0 + + + + + + + + diff --git a/MQTTnet.TestApp.NetCore/Program.cs b/MQTTnet.TestApp.NetCore/Program.cs new file mode 100644 index 0000000..4644471 --- /dev/null +++ b/MQTTnet.TestApp.NetCore/Program.cs @@ -0,0 +1,171 @@ +using MQTTnet.Core; +using MQTTnet.Core.Client; +using MQTTnet.Core.Diagnostics; +using MQTTnet.Core.Packets; +using MQTTnet.Core.Protocol; +using MQTTnet.Core.Server; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.TestApp.NetCore +{ + public static class Program + { + public static void Main(string[] args) + { + Console.WriteLine("MQTTnet - TestApp.NetFramework"); + Console.WriteLine("1 = Start client"); + Console.WriteLine("2 = Start server"); + var pressedKey = Console.ReadKey(true); + if (pressedKey.Key == ConsoleKey.D1) + { + Task.Run(() => RunClientAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + else if (pressedKey.Key == ConsoleKey.D2) + { + Task.Run(() => RunServerAsync(args)); + Thread.Sleep(Timeout.Infinite); + } + } + + private static async Task RunClientAsync(string[] arguments) + { + + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttClientOptions + { + Server = "localhost", + ClientId = "XYZ", + CleanSession = true, + ConnectionType = ConnectionTypes.WS + }; + + var client = new MqttClientFactory().CreateMqttClient(options); + client.ApplicationMessageReceived += (s, e) => + { + Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); + Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); + Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); + Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); + Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); + Console.WriteLine(); + }; + + client.Connected += async (s, e) => + { + Console.WriteLine("### CONNECTED WITH SERVER ###"); + + await client.SubscribeAsync(new List + { + new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) + }); + + Console.WriteLine("### SUBSCRIBED ###"); + }; + + client.Disconnected += async (s, e) => + { + Console.WriteLine("### DISCONNECTED FROM SERVER ###"); + await Task.Delay(TimeSpan.FromSeconds(5)); + + try + { + await client.ConnectAsync(); + } + catch + { + Console.WriteLine("### RECONNECTING FAILED ###"); + } + }; + + try + { + await client.ConnectAsync(); + } + catch (Exception exception) + { + Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); + } + + Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); + + while (true) + { + Console.ReadLine(); + + var applicationMessage = new MqttApplicationMessage( + "A/B/C", + Encoding.UTF8.GetBytes("Hello World"), + MqttQualityOfServiceLevel.AtLeastOnce, + false + ); + + await client.PublishAsync(applicationMessage); + } + } + catch (Exception exception) + { + Console.WriteLine(exception); + } + } + + private static void RunServerAsync(string[] arguments) + { + MqttTrace.TraceMessagePublished += (s, e) => + { + Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); + if (e.Exception != null) + { + Console.WriteLine(e.Exception); + } + }; + + try + { + var options = new MqttServerOptions + { + ConnectionValidator = p => + { + if (p.ClientId == "SpecialClient") + { + if (p.Username != "USER" || p.Password != "PASS") + { + return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; + } + } + + return MqttConnectReturnCode.ConnectionAccepted; + } + }; + + var mqttServer = new MqttServerFactory().CreateMqttServer(options); + mqttServer.Start(); + + Console.WriteLine("Press any key to exit."); + Console.ReadLine(); + + mqttServer.Stop(); + } + catch (Exception e) + { + Console.WriteLine(e); + } + + Console.ReadLine(); + } + } +} diff --git a/MQTTnet.sln b/MQTTnet.sln index ef9ec03..4ad2ba6 100644 --- a/MQTTnet.sln +++ b/MQTTnet.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.8 +VisualStudioVersion = 15.0.26730.12 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" EndProject @@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -162,6 +164,22 @@ Global {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -173,6 +191,7 @@ Global {D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} + {4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} diff --git a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs index c8c6da6..01ec484 100644 --- a/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs +++ b/Tests/MQTTnet.Core.Tests/TestMqttCommunicationAdapter.cs @@ -18,12 +18,12 @@ namespace MQTTnet.Core.Tests public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) { - await Task.FromResult(0); + await Task.FromResult(0).ConfigureAwait(false); } public async Task DisconnectAsync() { - await Task.FromResult(0); + await Task.FromResult(0).ConfigureAwait(false); } public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) @@ -31,14 +31,14 @@ namespace MQTTnet.Core.Tests ThrowIfPartnerIsNull(); Partner.SendPacketInternal(packet); - await Task.FromResult(0); + await Task.FromResult(0).ConfigureAwait(false); } public async Task ReceivePacketAsync(TimeSpan timeout) { ThrowIfPartnerIsNull(); - return await Task.Run(() => _incomingPackets.Take()); + return await Task.Run(() => _incomingPackets.Take()).ConfigureAwait(false); } private void SendPacketInternal(MqttBasePacket packet) diff --git a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj index c865621..2bf571e 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj +++ b/Tests/MQTTnet.TestApp.NetFramework/MQTTnet.TestApp.NetFramework.csproj @@ -90,7 +90,7 @@ - {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} + {a480ef90-0eaa-4d9a-b271-47a9c47f6f7d} MQTTnet.NetFramework diff --git a/Tests/MQTTnet.TestApp.NetFramework/Program.cs b/Tests/MQTTnet.TestApp.NetFramework/Program.cs index 824cc78..427274f 100644 --- a/Tests/MQTTnet.TestApp.NetFramework/Program.cs +++ b/Tests/MQTTnet.TestApp.NetFramework/Program.cs @@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework private static async Task RunClientAsync(string[] arguments) { + MqttTrace.TraceMessagePublished += (s, e) => { Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}");