@@ -0,0 +1,10 @@ | |||||
language: csharp | |||||
solution: MQTTnet.sln | |||||
matrix: | |||||
include: | |||||
- dotnet: 2.0.0 | |||||
mono: none | |||||
dist: trusty | |||||
env: DOTNETCORE=1 # optional, can be used to take different code paths in your script | |||||
- mono: latest |
@@ -85,7 +85,7 @@ namespace MQTTnet.Implementations | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null); | |||||
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null).ConfigureAwait(false); | |||||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); | var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttPacketSerializer()); | ||||
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | ||||
} | } | ||||
@@ -105,10 +105,10 @@ namespace MQTTnet.Implementations | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null); | |||||
var clientSocket = await Task.Factory.FromAsync(_tlsEndpointSocket.BeginAccept, _tlsEndpointSocket.EndAccept, null).ConfigureAwait(false); | |||||
var sslStream = new SslStream(new NetworkStream(clientSocket)); | var sslStream = new SslStream(new NetworkStream(clientSocket)); | ||||
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false); | |||||
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false).ConfigureAwait(false); | |||||
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); | var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttPacketSerializer()); | ||||
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); | ||||
@@ -46,13 +46,13 @@ namespace MQTTnet.Implementations | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | ||||
} | } | ||||
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null); | |||||
await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.GetPort(), null).ConfigureAwait(false); | |||||
if (options.TlsOptions.UseTls) | if (options.TlsOptions.UseTls) | ||||
{ | { | ||||
_sslStream = new SslStream(new NetworkStream(_socket, true)); | _sslStream = new SslStream(new NetworkStream(_socket, true)); | ||||
_dataStream = _sslStream; | _dataStream = _sslStream; | ||||
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); | |||||
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
@@ -45,13 +45,13 @@ namespace MQTTnet.Implementations | |||||
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); | ||||
} | } | ||||
await _socket.ConnectAsync(options.Server, options.GetPort()); | |||||
await _socket.ConnectAsync(options.Server, options.GetPort()).ConfigureAwait(false); | |||||
if (options.TlsOptions.UseTls) | if (options.TlsOptions.UseTls) | ||||
{ | { | ||||
_sslStream = new SslStream(new NetworkStream(_socket, true)); | _sslStream = new SslStream(new NetworkStream(_socket, true)); | ||||
_dataStream = _sslStream; | _dataStream = _sslStream; | ||||
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation); | |||||
await _sslStream.AuthenticateAsClientAsync(options.Server, LoadCertificates(options), SslProtocols.Tls12, options.TlsOptions.CheckCertificateRevocation).ConfigureAwait(false); | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
@@ -0,0 +1,119 @@ | |||||
using MQTTnet.Core.Channel; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Exceptions; | |||||
using System; | |||||
using System.Net.WebSockets; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace MQTTnet.Implementations | |||||
{ | |||||
public sealed class MqttWebSocketsChannel : IMqttCommunicationChannel, IDisposable | |||||
{ | |||||
private ClientWebSocket _webSocket; | |||||
private const int BufferSize = 4096; | |||||
private const int BufferAmplifier = 20; | |||||
private readonly byte[] WebSocketBuffer = new byte[BufferSize * BufferAmplifier]; | |||||
private int WebSocketBufferSize; | |||||
private int WebSocketBufferOffset; | |||||
public MqttWebSocketsChannel() | |||||
{ | |||||
_webSocket = new ClientWebSocket(); | |||||
} | |||||
public async Task ConnectAsync(MqttClientOptions options) | |||||
{ | |||||
_webSocket = null; | |||||
try | |||||
{ | |||||
_webSocket = new ClientWebSocket(); | |||||
await _webSocket.ConnectAsync(new Uri(options.Server), CancellationToken.None); | |||||
} | |||||
catch (WebSocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
public async Task DisconnectAsync() | |||||
{ | |||||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
if (_webSocket != null) | |||||
{ | |||||
_webSocket.Dispose(); | |||||
} | |||||
} | |||||
public Task ReadAsync(byte[] buffer) | |||||
{ | |||||
return Task.WhenAll(ReadToBufferAsync(buffer)); | |||||
} | |||||
private async Task ReadToBufferAsync(byte[] buffer) | |||||
{ | |||||
var temporaryBuffer = new byte[BufferSize]; | |||||
var offset = 0; | |||||
while (_webSocket.State == WebSocketState.Open) | |||||
{ | |||||
if (WebSocketBufferSize == 0) | |||||
{ | |||||
WebSocketBufferOffset = 0; | |||||
WebSocketReceiveResult response; | |||||
do | |||||
{ | |||||
response = | |||||
await _webSocket.ReceiveAsync(new ArraySegment<byte>(temporaryBuffer), CancellationToken.None); | |||||
temporaryBuffer.CopyTo(WebSocketBuffer, offset); | |||||
offset += response.Count; | |||||
temporaryBuffer = new byte[BufferSize]; | |||||
} while (!response.EndOfMessage); | |||||
WebSocketBufferSize = response.Count; | |||||
if (response.MessageType == WebSocketMessageType.Close) | |||||
{ | |||||
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); | |||||
} | |||||
Buffer.BlockCopy(WebSocketBuffer, 0, buffer, 0, buffer.Length); | |||||
WebSocketBufferSize -= buffer.Length; | |||||
WebSocketBufferOffset += buffer.Length; | |||||
} | |||||
else | |||||
{ | |||||
Buffer.BlockCopy(WebSocketBuffer, WebSocketBufferOffset, buffer, 0, buffer.Length); | |||||
WebSocketBufferSize -= buffer.Length; | |||||
WebSocketBufferOffset += buffer.Length; | |||||
} | |||||
return; | |||||
} | |||||
} | |||||
public Task WriteAsync(byte[] buffer) | |||||
{ | |||||
if (buffer == null) { | |||||
throw new ArgumentNullException(nameof(buffer)); | |||||
} | |||||
try | |||||
{ | |||||
return _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, | |||||
CancellationToken.None); | |||||
} | |||||
catch (WebSocketException exception) | |||||
{ | |||||
throw new MqttCommunicationException(exception); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -24,6 +24,8 @@ | |||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="System.Net.Security" Version="4.3.1" /> | <PackageReference Include="System.Net.Security" Version="4.3.1" /> | ||||
<PackageReference Include="System.Net.WebSockets" Version="4.3.0" /> | |||||
<PackageReference Include="System.Net.WebSockets.Client" Version="4.3.1" /> | |||||
<PackageReference Include="System.Threading.Thread" Version="4.3.0" /> | <PackageReference Include="System.Threading.Thread" Version="4.3.0" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -3,6 +3,7 @@ using MQTTnet.Core.Adapter; | |||||
using MQTTnet.Core.Client; | using MQTTnet.Core.Client; | ||||
using MQTTnet.Core.Serializer; | using MQTTnet.Core.Serializer; | ||||
using MQTTnet.Implementations; | using MQTTnet.Implementations; | ||||
using MQTTnet.Core.Channel; | |||||
namespace MQTTnet | namespace MQTTnet | ||||
{ | { | ||||
@@ -10,9 +11,27 @@ namespace MQTTnet | |||||
{ | { | ||||
public IMqttClient CreateMqttClient(MqttClientOptions options) | public IMqttClient CreateMqttClient(MqttClientOptions options) | ||||
{ | { | ||||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||||
if (options == null) { | |||||
throw new ArgumentNullException(nameof(options)); | |||||
} | |||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttPacketSerializer())); | |||||
return new MqttClient(options, new MqttChannelCommunicationAdapter(GetMqttCommunicationChannel(options), new MqttPacketSerializer())); | |||||
} | |||||
private static IMqttCommunicationChannel GetMqttCommunicationChannel(MqttClientOptions options) | |||||
{ | |||||
switch (options.ConnectionType) | |||||
{ | |||||
case ConnectionTypes.TCP: | |||||
case ConnectionTypes.TLS: | |||||
return new MqttTcpChannel(); | |||||
case ConnectionTypes.WS: | |||||
case ConnectionTypes.WSS: | |||||
return new MqttWebSocketsChannel(); | |||||
default: | |||||
return null; | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -23,19 +23,19 @@ namespace MQTTnet.Core.Adapter | |||||
public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) | public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) | ||||
{ | { | ||||
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout); | |||||
await ExecuteWithTimeoutAsync(_channel.ConnectAsync(options), timeout).ConfigureAwait(false); | |||||
} | } | ||||
public async Task DisconnectAsync() | public async Task DisconnectAsync() | ||||
{ | { | ||||
await _channel.DisconnectAsync(); | |||||
await _channel.DisconnectAsync().ConfigureAwait(false); | |||||
} | } | ||||
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) | public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) | ||||
{ | { | ||||
MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); | MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); | ||||
await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout); | |||||
await ExecuteWithTimeoutAsync(PacketSerializer.SerializeAsync(packet, _channel), timeout).ConfigureAwait(false); | |||||
} | } | ||||
public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) | public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) | ||||
@@ -43,11 +43,11 @@ namespace MQTTnet.Core.Adapter | |||||
MqttBasePacket packet; | MqttBasePacket packet; | ||||
if (timeout > TimeSpan.Zero) | if (timeout > TimeSpan.Zero) | ||||
{ | { | ||||
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout); | |||||
packet = await ExecuteWithTimeoutAsync(PacketSerializer.DeserializeAsync(_channel), timeout).ConfigureAwait(false); | |||||
} | } | ||||
else | else | ||||
{ | { | ||||
packet = await PacketSerializer.DeserializeAsync(_channel); | |||||
packet = await PacketSerializer.DeserializeAsync(_channel).ConfigureAwait(false); | |||||
} | } | ||||
if (packet == null) | if (packet == null) | ||||
@@ -62,7 +62,7 @@ namespace MQTTnet.Core.Adapter | |||||
private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout) | private static async Task<TResult> ExecuteWithTimeoutAsync<TResult>(Task<TResult> task, TimeSpan timeout) | ||||
{ | { | ||||
var timeoutTask = Task.Delay(timeout); | var timeoutTask = Task.Delay(timeout); | ||||
if (await Task.WhenAny(timeoutTask, task) == timeoutTask) | |||||
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) | |||||
{ | { | ||||
throw new MqttCommunicationTimedOutException(); | throw new MqttCommunicationTimedOutException(); | ||||
} | } | ||||
@@ -78,7 +78,7 @@ namespace MQTTnet.Core.Adapter | |||||
private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) | private static async Task ExecuteWithTimeoutAsync(Task task, TimeSpan timeout) | ||||
{ | { | ||||
var timeoutTask = Task.Delay(timeout); | var timeoutTask = Task.Delay(timeout); | ||||
if (await Task.WhenAny(timeoutTask, task) == timeoutTask) | |||||
if (await Task.WhenAny(timeoutTask, task).ConfigureAwait(false) == timeoutTask) | |||||
{ | { | ||||
throw new MqttCommunicationTimedOutException(); | throw new MqttCommunicationTimedOutException(); | ||||
} | } | ||||
@@ -0,0 +1,14 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace MQTTnet.Core.Client | |||||
{ | |||||
public enum ConnectionTypes | |||||
{ | |||||
TCP, | |||||
TLS, | |||||
WS, | |||||
WSS | |||||
} | |||||
} |
@@ -53,7 +53,7 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
_disconnectedEventSuspended = false; | _disconnectedEventSuspended = false; | ||||
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); | |||||
await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout).ConfigureAwait(false); | |||||
MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); | MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); | ||||
@@ -73,10 +73,10 @@ namespace MQTTnet.Core.Client | |||||
StartReceivePackets(); | StartReceivePackets(); | ||||
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket); | |||||
var response = await SendAndReceiveAsync<MqttConnAckPacket>(connectPacket).ConfigureAwait(false); | |||||
if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) | if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) | ||||
{ | { | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
throw new MqttConnectingFailedException(response.ConnectReturnCode); | throw new MqttConnectingFailedException(response.ConnectReturnCode); | ||||
} | } | ||||
@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Client | |||||
} | } | ||||
catch (Exception) | catch (Exception) | ||||
{ | { | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
throw; | throw; | ||||
} | } | ||||
} | } | ||||
@@ -101,11 +101,11 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await SendAsync(new MqttDisconnectPacket()); | |||||
await SendAsync(new MqttDisconnectPacket()).ConfigureAwait(false); | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
@@ -158,7 +158,7 @@ namespace MQTTnet.Core.Client | |||||
TopicFilters = topicFilters | TopicFilters = topicFilters | ||||
}; | }; | ||||
await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket); | |||||
await SendAndReceiveAsync<MqttUnsubAckPacket>(unsubscribePacket).ConfigureAwait(false); | |||||
} | } | ||||
public async Task PublishAsync(MqttApplicationMessage applicationMessage) | public async Task PublishAsync(MqttApplicationMessage applicationMessage) | ||||
@@ -171,18 +171,18 @@ namespace MQTTnet.Core.Client | |||||
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) | if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) | ||||
{ | { | ||||
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] | // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] | ||||
await SendAsync(publishPacket); | |||||
await SendAsync(publishPacket).ConfigureAwait(false); | |||||
} | } | ||||
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) | else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) | ||||
{ | { | ||||
publishPacket.PacketIdentifier = GetNewPacketIdentifier(); | publishPacket.PacketIdentifier = GetNewPacketIdentifier(); | ||||
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket); | |||||
await SendAndReceiveAsync<MqttPubAckPacket>(publishPacket).ConfigureAwait(false); | |||||
} | } | ||||
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) | else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) | ||||
{ | { | ||||
publishPacket.PacketIdentifier = GetNewPacketIdentifier(); | publishPacket.PacketIdentifier = GetNewPacketIdentifier(); | ||||
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket); | |||||
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()); | |||||
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket).ConfigureAwait(false); | |||||
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>()).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
@@ -195,7 +195,7 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
await _adapter.DisconnectAsync(); | |||||
await _adapter.DisconnectAsync().ConfigureAwait(false); | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
@@ -301,7 +301,7 @@ namespace MQTTnet.Core.Client | |||||
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); | _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier); | ||||
} | } | ||||
await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()); | |||||
await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>()).ConfigureAwait(false); | |||||
} | } | ||||
private Task SendAsync(MqttBasePacket packet) | private Task SendAsync(MqttBasePacket packet) | ||||
@@ -313,16 +313,12 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
bool ResponsePacketSelector(MqttBasePacket p) | bool ResponsePacketSelector(MqttBasePacket p) | ||||
{ | { | ||||
var p1 = p as TResponsePacket; | |||||
if (p1 == null) | |||||
if (!(p is TResponsePacket p1)) | |||||
{ | { | ||||
return false; | return false; | ||||
} | } | ||||
var pi1 = requestPacket as IMqttPacketWithIdentifier; | |||||
var pi2 = p as IMqttPacketWithIdentifier; | |||||
if (pi1 == null || pi2 == null) | |||||
if (!(requestPacket is IMqttPacketWithIdentifier pi1) || !(p is IMqttPacketWithIdentifier pi2)) | |||||
{ | { | ||||
return true; | return true; | ||||
} | } | ||||
@@ -330,8 +326,8 @@ namespace MQTTnet.Core.Client | |||||
return pi1.PacketIdentifier == pi2.PacketIdentifier; | return pi1.PacketIdentifier == pi2.PacketIdentifier; | ||||
} | } | ||||
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); | |||||
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout); | |||||
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout).ConfigureAwait(false); | |||||
return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout).ConfigureAwait(false); | |||||
} | } | ||||
private ushort GetNewPacketIdentifier() | private ushort GetNewPacketIdentifier() | ||||
@@ -347,19 +343,19 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested) | while (!cancellationToken.IsCancellationRequested) | ||||
{ | { | ||||
await Task.Delay(_options.KeepAlivePeriod, cancellationToken); | |||||
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()); | |||||
await Task.Delay(_options.KeepAlivePeriod, cancellationToken).ConfigureAwait(false); | |||||
await SendAndReceiveAsync<MqttPingRespPacket>(new MqttPingReqPacket()).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
catch (MqttCommunicationException exception) | catch (MqttCommunicationException exception) | ||||
{ | { | ||||
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); | MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); | MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
@@ -374,7 +370,7 @@ namespace MQTTnet.Core.Client | |||||
{ | { | ||||
while (!cancellationToken.IsCancellationRequested) | while (!cancellationToken.IsCancellationRequested) | ||||
{ | { | ||||
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero); | |||||
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero).ConfigureAwait(false); | |||||
MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); | MqttTrace.Information(nameof(MqttClient), $"Received <<< {packet}"); | ||||
StartProcessReceivedPacket(packet, cancellationToken); | StartProcessReceivedPacket(packet, cancellationToken); | ||||
@@ -383,12 +379,12 @@ namespace MQTTnet.Core.Client | |||||
catch (MqttCommunicationException exception) | catch (MqttCommunicationException exception) | ||||
{ | { | ||||
MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); | MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication exception while receiving packets."); | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
} | } | ||||
catch (Exception exception) | catch (Exception exception) | ||||
{ | { | ||||
MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); | MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while receiving packets."); | ||||
await DisconnectInternalAsync(); | |||||
await DisconnectInternalAsync().ConfigureAwait(false); | |||||
} | } | ||||
finally | finally | ||||
{ | { | ||||
@@ -24,5 +24,7 @@ namespace MQTTnet.Core.Client | |||||
public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10); | ||||
public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | public MqttProtocolVersion ProtocolVersion { get; set; } = MqttProtocolVersion.V311; | ||||
public ConnectionTypes ConnectionType { get; set; } = ConnectionTypes.TCP; | |||||
} | } | ||||
} | } |
@@ -29,7 +29,7 @@ namespace MQTTnet.Core.Serializer | |||||
public async Task ReadToEndAsync() | public async Task ReadToEndAsync() | ||||
{ | { | ||||
await ReadFixedHeaderAsync(); | await ReadFixedHeaderAsync(); | ||||
await ReadRemainingLengthAsync(); | |||||
await ReadRemainingLengthAsync().ConfigureAwait(false); | |||||
if (_remainingLength == 0) | if (_remainingLength == 0) | ||||
{ | { | ||||
@@ -37,7 +37,7 @@ namespace MQTTnet.Core.Serializer | |||||
} | } | ||||
var buffer = new byte[_remainingLength]; | var buffer = new byte[_remainingLength]; | ||||
await ReadFromSourceAsync(buffer); | |||||
await ReadFromSourceAsync(buffer).ConfigureAwait(false); | |||||
_remainingData.Write(buffer, 0, buffer.Length); | _remainingData.Write(buffer, 0, buffer.Length); | ||||
_remainingData.Position = 0; | _remainingData.Position = 0; | ||||
@@ -45,12 +45,12 @@ namespace MQTTnet.Core.Serializer | |||||
public async Task<byte> ReadRemainingDataByteAsync() | public async Task<byte> ReadRemainingDataByteAsync() | ||||
{ | { | ||||
return (await ReadRemainingDataAsync(1))[0]; | |||||
return (await ReadRemainingDataAsync(1).ConfigureAwait(false))[0]; | |||||
} | } | ||||
public async Task<ushort> ReadRemainingDataUShortAsync() | public async Task<ushort> ReadRemainingDataUShortAsync() | ||||
{ | { | ||||
var buffer = await ReadRemainingDataAsync(2); | |||||
var buffer = await ReadRemainingDataAsync(2).ConfigureAwait(false); | |||||
var temp = buffer[0]; | var temp = buffer[0]; | ||||
buffer[0] = buffer[1]; | buffer[0] = buffer[1]; | ||||
@@ -68,7 +68,7 @@ namespace MQTTnet.Core.Serializer | |||||
public async Task<byte[]> ReadRemainingDataWithLengthPrefixAsync() | public async Task<byte[]> ReadRemainingDataWithLengthPrefixAsync() | ||||
{ | { | ||||
var length = await ReadRemainingDataUShortAsync(); | var length = await ReadRemainingDataUShortAsync(); | ||||
return await ReadRemainingDataAsync(length); | |||||
return await ReadRemainingDataAsync(length).ConfigureAwait(false); | |||||
} | } | ||||
public Task<byte[]> ReadRemainingDataAsync() | public Task<byte[]> ReadRemainingDataAsync() | ||||
@@ -79,7 +79,7 @@ namespace MQTTnet.Core.Serializer | |||||
public async Task<byte[]> ReadRemainingDataAsync(int length) | public async Task<byte[]> ReadRemainingDataAsync(int length) | ||||
{ | { | ||||
var buffer = new byte[length]; | var buffer = new byte[length]; | ||||
await _remainingData.ReadAsync(buffer, 0, buffer.Length); | |||||
await _remainingData.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); | |||||
return buffer; | return buffer; | ||||
} | } | ||||
@@ -92,7 +92,7 @@ namespace MQTTnet.Core.Serializer | |||||
byte encodedByte; | byte encodedByte; | ||||
do | do | ||||
{ | { | ||||
encodedByte = await ReadStreamByteAsync(); | |||||
encodedByte = await ReadStreamByteAsync().ConfigureAwait(false); | |||||
value += (encodedByte & 127) * multiplier; | value += (encodedByte & 127) * multiplier; | ||||
multiplier *= 128; | multiplier *= 128; | ||||
if (multiplier > 128 * 128 * 128) | if (multiplier > 128 * 128 * 128) | ||||
@@ -119,13 +119,13 @@ namespace MQTTnet.Core.Serializer | |||||
private async Task<byte> ReadStreamByteAsync() | private async Task<byte> ReadStreamByteAsync() | ||||
{ | { | ||||
var buffer = new byte[1]; | var buffer = new byte[1]; | ||||
await ReadFromSourceAsync(buffer); | |||||
await ReadFromSourceAsync(buffer).ConfigureAwait(false); | |||||
return buffer[0]; | return buffer[0]; | ||||
} | } | ||||
private async Task ReadFixedHeaderAsync() | private async Task ReadFixedHeaderAsync() | ||||
{ | { | ||||
FixedHeader = await ReadStreamByteAsync(); | |||||
FixedHeader = await ReadStreamByteAsync().ConfigureAwait(false); | |||||
var byteReader = new ByteReader(FixedHeader); | var byteReader = new ByteReader(FixedHeader); | ||||
byteReader.Read(4); | byteReader.Read(4); | ||||
@@ -100,18 +100,18 @@ namespace MQTTnet.Core.Serializer | |||||
using (var mqttPacketReader = new MqttPacketReader(source)) | using (var mqttPacketReader = new MqttPacketReader(source)) | ||||
{ | { | ||||
await mqttPacketReader.ReadToEndAsync(); | |||||
await mqttPacketReader.ReadToEndAsync().ConfigureAwait(false); | |||||
switch (mqttPacketReader.ControlPacketType) | switch (mqttPacketReader.ControlPacketType) | ||||
{ | { | ||||
case MqttControlPacketType.Connect: | case MqttControlPacketType.Connect: | ||||
{ | { | ||||
return await DeserializeConnectAsync(mqttPacketReader); | |||||
return await DeserializeConnectAsync(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.ConnAck: | case MqttControlPacketType.ConnAck: | ||||
{ | { | ||||
return await DeserializeConnAck(mqttPacketReader); | |||||
return await DeserializeConnAck(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.Disconnect: | case MqttControlPacketType.Disconnect: | ||||
@@ -121,14 +121,14 @@ namespace MQTTnet.Core.Serializer | |||||
case MqttControlPacketType.Publish: | case MqttControlPacketType.Publish: | ||||
{ | { | ||||
return await DeserializePublishAsync(mqttPacketReader); | |||||
return await DeserializePublishAsync(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.PubAck: | case MqttControlPacketType.PubAck: | ||||
{ | { | ||||
return new MqttPubAckPacket | return new MqttPubAckPacket | ||||
{ | { | ||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
} | } | ||||
@@ -136,7 +136,7 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
return new MqttPubRecPacket | return new MqttPubRecPacket | ||||
{ | { | ||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
} | } | ||||
@@ -144,7 +144,7 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
return new MqttPubRelPacket | return new MqttPubRelPacket | ||||
{ | { | ||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
} | } | ||||
@@ -152,7 +152,7 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
return new MqttPubCompPacket | return new MqttPubCompPacket | ||||
{ | { | ||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
} | } | ||||
@@ -168,24 +168,24 @@ namespace MQTTnet.Core.Serializer | |||||
case MqttControlPacketType.Subscribe: | case MqttControlPacketType.Subscribe: | ||||
{ | { | ||||
return await DeserializeSubscribeAsync(mqttPacketReader); | |||||
return await DeserializeSubscribeAsync(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.SubAck: | case MqttControlPacketType.SubAck: | ||||
{ | { | ||||
return await DeserializeSubAck(mqttPacketReader); | |||||
return await DeserializeSubAck(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.Unsubscibe: | case MqttControlPacketType.Unsubscibe: | ||||
{ | { | ||||
return await DeserializeUnsubscribeAsync(mqttPacketReader); | |||||
return await DeserializeUnsubscribeAsync(mqttPacketReader).ConfigureAwait(false); | |||||
} | } | ||||
case MqttControlPacketType.UnsubAck: | case MqttControlPacketType.UnsubAck: | ||||
{ | { | ||||
return new MqttUnsubAckPacket | return new MqttUnsubAckPacket | ||||
{ | { | ||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await mqttPacketReader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
} | } | ||||
@@ -201,12 +201,12 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
var packet = new MqttUnsubscribePacket | var packet = new MqttUnsubscribePacket | ||||
{ | { | ||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), | |||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), | |||||
}; | }; | ||||
while (!reader.EndOfRemainingData) | while (!reader.EndOfRemainingData) | ||||
{ | { | ||||
packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync()); | |||||
packet.TopicFilters.Add(await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false)); | |||||
} | } | ||||
return packet; | return packet; | ||||
@@ -216,14 +216,14 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
var packet = new MqttSubscribePacket | var packet = new MqttSubscribePacket | ||||
{ | { | ||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync(), | |||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false), | |||||
}; | }; | ||||
while (!reader.EndOfRemainingData) | while (!reader.EndOfRemainingData) | ||||
{ | { | ||||
packet.TopicFilters.Add(new TopicFilter( | packet.TopicFilters.Add(new TopicFilter( | ||||
await reader.ReadRemainingDataStringWithLengthPrefixAsync(), | await reader.ReadRemainingDataStringWithLengthPrefixAsync(), | ||||
(MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync())); | |||||
(MqttQualityOfServiceLevel)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false))); | |||||
} | } | ||||
return packet; | return packet; | ||||
@@ -236,12 +236,12 @@ namespace MQTTnet.Core.Serializer | |||||
var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); | var qualityOfServiceLevel = (MqttQualityOfServiceLevel)fixedHeader.Read(2); | ||||
var dup = fixedHeader.Read(); | var dup = fixedHeader.Read(); | ||||
var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); | |||||
var topic = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); | |||||
ushort packetIdentifier = 0; | ushort packetIdentifier = 0; | ||||
if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | if (qualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce) | ||||
{ | { | ||||
packetIdentifier = await reader.ReadRemainingDataUShortAsync(); | |||||
packetIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); | |||||
} | } | ||||
var packet = new MqttPublishPacket | var packet = new MqttPublishPacket | ||||
@@ -250,7 +250,7 @@ namespace MQTTnet.Core.Serializer | |||||
QualityOfServiceLevel = qualityOfServiceLevel, | QualityOfServiceLevel = qualityOfServiceLevel, | ||||
Dup = dup, | Dup = dup, | ||||
Topic = topic, | Topic = topic, | ||||
Payload = await reader.ReadRemainingDataAsync(), | |||||
Payload = await reader.ReadRemainingDataAsync().ConfigureAwait(false), | |||||
PacketIdentifier = packetIdentifier | PacketIdentifier = packetIdentifier | ||||
}; | }; | ||||
@@ -259,13 +259,13 @@ namespace MQTTnet.Core.Serializer | |||||
private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader) | private static async Task<MqttBasePacket> DeserializeConnectAsync(MqttPacketReader reader) | ||||
{ | { | ||||
await reader.ReadRemainingDataAsync(2); // Skip 2 bytes | |||||
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); // Skip 2 bytes | |||||
MqttProtocolVersion protocolVersion; | MqttProtocolVersion protocolVersion; | ||||
var protocolName = await reader.ReadRemainingDataAsync(4); | |||||
var protocolName = await reader.ReadRemainingDataAsync(4).ConfigureAwait(false); | |||||
if (protocolName.SequenceEqual(ProtocolVersionV310Name)) | if (protocolName.SequenceEqual(ProtocolVersionV310Name)) | ||||
{ | { | ||||
await reader.ReadRemainingDataAsync(2); | |||||
await reader.ReadRemainingDataAsync(2).ConfigureAwait(false); | |||||
protocolVersion = MqttProtocolVersion.V310; | protocolVersion = MqttProtocolVersion.V310; | ||||
} | } | ||||
else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) | else if (protocolName.SequenceEqual(ProtocolVersionV311Name)) | ||||
@@ -277,8 +277,8 @@ namespace MQTTnet.Core.Serializer | |||||
throw new MqttProtocolViolationException("Protocol name is not supported."); | throw new MqttProtocolViolationException("Protocol name is not supported."); | ||||
} | } | ||||
var protocolLevel = await reader.ReadRemainingDataByteAsync(); | |||||
var connectFlags = await reader.ReadRemainingDataByteAsync(); | |||||
var protocolLevel = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); | |||||
var connectFlags = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); | |||||
var connectFlagsReader = new ByteReader(connectFlags); | var connectFlagsReader = new ByteReader(connectFlags); | ||||
connectFlagsReader.Read(); // Reserved. | connectFlagsReader.Read(); // Reserved. | ||||
@@ -295,26 +295,26 @@ namespace MQTTnet.Core.Serializer | |||||
var passwordFlag = connectFlagsReader.Read(); | var passwordFlag = connectFlagsReader.Read(); | ||||
var usernameFlag = connectFlagsReader.Read(); | var usernameFlag = connectFlagsReader.Read(); | ||||
packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync(); | |||||
packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); | |||||
packet.KeepAlivePeriod = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false); | |||||
packet.ClientId = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); | |||||
if (willFlag) | if (willFlag) | ||||
{ | { | ||||
packet.WillMessage = new MqttApplicationMessage( | packet.WillMessage = new MqttApplicationMessage( | ||||
await reader.ReadRemainingDataStringWithLengthPrefixAsync(), | |||||
await reader.ReadRemainingDataWithLengthPrefixAsync(), | |||||
await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false), | |||||
await reader.ReadRemainingDataWithLengthPrefixAsync().ConfigureAwait(false), | |||||
(MqttQualityOfServiceLevel)willQoS, | (MqttQualityOfServiceLevel)willQoS, | ||||
willRetain); | willRetain); | ||||
} | } | ||||
if (usernameFlag) | if (usernameFlag) | ||||
{ | { | ||||
packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); | |||||
packet.Username = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); | |||||
} | } | ||||
if (passwordFlag) | if (passwordFlag) | ||||
{ | { | ||||
packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync(); | |||||
packet.Password = await reader.ReadRemainingDataStringWithLengthPrefixAsync().ConfigureAwait(false); | |||||
} | } | ||||
ValidateConnectPacket(packet); | ValidateConnectPacket(packet); | ||||
@@ -325,12 +325,12 @@ namespace MQTTnet.Core.Serializer | |||||
{ | { | ||||
var packet = new MqttSubAckPacket | var packet = new MqttSubAckPacket | ||||
{ | { | ||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync() | |||||
PacketIdentifier = await reader.ReadRemainingDataUShortAsync().ConfigureAwait(false) | |||||
}; | }; | ||||
while (!reader.EndOfRemainingData) | while (!reader.EndOfRemainingData) | ||||
{ | { | ||||
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync()); | |||||
packet.SubscribeReturnCodes.Add((MqttSubscribeReturnCode)await reader.ReadRemainingDataByteAsync().ConfigureAwait(false)); | |||||
} | } | ||||
return packet; | return packet; | ||||
@@ -338,8 +338,8 @@ namespace MQTTnet.Core.Serializer | |||||
private static async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader) | private static async Task<MqttBasePacket> DeserializeConnAck(MqttPacketReader reader) | ||||
{ | { | ||||
var variableHeader1 = await reader.ReadRemainingDataByteAsync(); | |||||
var variableHeader2 = await reader.ReadRemainingDataByteAsync(); | |||||
var variableHeader1 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); | |||||
var variableHeader2 = await reader.ReadRemainingDataByteAsync().ConfigureAwait(false); | |||||
var packet = new MqttConnAckPacket | var packet = new MqttConnAckPacket | ||||
{ | { | ||||
@@ -457,7 +457,7 @@ namespace MQTTnet.Core.Serializer | |||||
output.Write(packet.PacketIdentifier); | output.Write(packet.PacketIdentifier); | ||||
output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); | output.InjectFixedHeader(MqttControlPacketType.PubRel, 0x02); | ||||
await output.WriteToAsync(destination); | |||||
await output.WriteToAsync(destination).ConfigureAwait(false); | |||||
} | } | ||||
} | } | ||||
@@ -0,0 +1,13 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\Frameworks\MQTTnet.Netstandard\MQTTnet.NetStandard.csproj" /> | |||||
<ProjectReference Include="..\MQTTnet.Core\MQTTnet.Core.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,171 @@ | |||||
using MQTTnet.Core; | |||||
using MQTTnet.Core.Client; | |||||
using MQTTnet.Core.Diagnostics; | |||||
using MQTTnet.Core.Packets; | |||||
using MQTTnet.Core.Protocol; | |||||
using MQTTnet.Core.Server; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace MQTTnet.TestApp.NetCore | |||||
{ | |||||
public static class Program | |||||
{ | |||||
public static void Main(string[] args) | |||||
{ | |||||
Console.WriteLine("MQTTnet - TestApp.NetFramework"); | |||||
Console.WriteLine("1 = Start client"); | |||||
Console.WriteLine("2 = Start server"); | |||||
var pressedKey = Console.ReadKey(true); | |||||
if (pressedKey.Key == ConsoleKey.D1) | |||||
{ | |||||
Task.Run(() => RunClientAsync(args)); | |||||
Thread.Sleep(Timeout.Infinite); | |||||
} | |||||
else if (pressedKey.Key == ConsoleKey.D2) | |||||
{ | |||||
Task.Run(() => RunServerAsync(args)); | |||||
Thread.Sleep(Timeout.Infinite); | |||||
} | |||||
} | |||||
private static async Task RunClientAsync(string[] arguments) | |||||
{ | |||||
MqttTrace.TraceMessagePublished += (s, e) => | |||||
{ | |||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||||
if (e.Exception != null) | |||||
{ | |||||
Console.WriteLine(e.Exception); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
var options = new MqttClientOptions | |||||
{ | |||||
Server = "localhost", | |||||
ClientId = "XYZ", | |||||
CleanSession = true, | |||||
ConnectionType = ConnectionTypes.WS | |||||
}; | |||||
var client = new MqttClientFactory().CreateMqttClient(options); | |||||
client.ApplicationMessageReceived += (s, e) => | |||||
{ | |||||
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); | |||||
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); | |||||
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); | |||||
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); | |||||
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); | |||||
Console.WriteLine(); | |||||
}; | |||||
client.Connected += async (s, e) => | |||||
{ | |||||
Console.WriteLine("### CONNECTED WITH SERVER ###"); | |||||
await client.SubscribeAsync(new List<TopicFilter> | |||||
{ | |||||
new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce) | |||||
}); | |||||
Console.WriteLine("### SUBSCRIBED ###"); | |||||
}; | |||||
client.Disconnected += async (s, e) => | |||||
{ | |||||
Console.WriteLine("### DISCONNECTED FROM SERVER ###"); | |||||
await Task.Delay(TimeSpan.FromSeconds(5)); | |||||
try | |||||
{ | |||||
await client.ConnectAsync(); | |||||
} | |||||
catch | |||||
{ | |||||
Console.WriteLine("### RECONNECTING FAILED ###"); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
await client.ConnectAsync(); | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
Console.WriteLine("### CONNECTING FAILED ###" + Environment.NewLine + exception); | |||||
} | |||||
Console.WriteLine("### WAITING FOR APPLICATION MESSAGES ###"); | |||||
while (true) | |||||
{ | |||||
Console.ReadLine(); | |||||
var applicationMessage = new MqttApplicationMessage( | |||||
"A/B/C", | |||||
Encoding.UTF8.GetBytes("Hello World"), | |||||
MqttQualityOfServiceLevel.AtLeastOnce, | |||||
false | |||||
); | |||||
await client.PublishAsync(applicationMessage); | |||||
} | |||||
} | |||||
catch (Exception exception) | |||||
{ | |||||
Console.WriteLine(exception); | |||||
} | |||||
} | |||||
private static void RunServerAsync(string[] arguments) | |||||
{ | |||||
MqttTrace.TraceMessagePublished += (s, e) => | |||||
{ | |||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | |||||
if (e.Exception != null) | |||||
{ | |||||
Console.WriteLine(e.Exception); | |||||
} | |||||
}; | |||||
try | |||||
{ | |||||
var options = new MqttServerOptions | |||||
{ | |||||
ConnectionValidator = p => | |||||
{ | |||||
if (p.ClientId == "SpecialClient") | |||||
{ | |||||
if (p.Username != "USER" || p.Password != "PASS") | |||||
{ | |||||
return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; | |||||
} | |||||
} | |||||
return MqttConnectReturnCode.ConnectionAccepted; | |||||
} | |||||
}; | |||||
var mqttServer = new MqttServerFactory().CreateMqttServer(options); | |||||
mqttServer.Start(); | |||||
Console.WriteLine("Press any key to exit."); | |||||
Console.ReadLine(); | |||||
mqttServer.Stop(); | |||||
} | |||||
catch (Exception e) | |||||
{ | |||||
Console.WriteLine(e); | |||||
} | |||||
Console.ReadLine(); | |||||
} | |||||
} | |||||
} |
@@ -1,7 +1,7 @@ | |||||
| | ||||
Microsoft Visual Studio Solution File, Format Version 12.00 | Microsoft Visual Studio Solution File, Format Version 12.00 | ||||
# Visual Studio 15 | # Visual Studio 15 | ||||
VisualStudioVersion = 15.0.26730.8 | |||||
VisualStudioVersion = 15.0.26730.12 | |||||
MinimumVisualStudioVersion = 10.0.40219.1 | MinimumVisualStudioVersion = 10.0.40219.1 | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" | Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}" | ||||
EndProject | EndProject | ||||
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution | |||||
README.md = README.md | README.md = README.md | ||||
EndProjectSection | EndProjectSection | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MQTTnet.TestApp.NetCore", "MQTTnet.TestApp.NetCore\MQTTnet.TestApp.NetCore.csproj", "{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}" | |||||
EndProject | |||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
Debug|Any CPU = Debug|Any CPU | Debug|Any CPU = Debug|Any CPU | ||||
@@ -162,6 +164,22 @@ Global | |||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x64.Build.0 = Release|Any CPU | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.ActiveCfg = Release|Any CPU | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU | {3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Release|x86.Build.0 = Release|Any CPU | ||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|ARM.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x64.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.ActiveCfg = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Debug|x86.Build.0 = Debug|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|ARM.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x64.Build.0 = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.ActiveCfg = Release|Any CPU | |||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453}.Release|x86.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
@@ -173,6 +191,7 @@ Global | |||||
{D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {D9D74F33-6943-49B2-B765-7BD589082098} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | {FF1F72D6-9524-4422-9497-3CC0002216ED} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | ||||
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | {3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB} | ||||
{4094DDF0-1DFB-4FA7-A2FA-AD5A44545453} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4} | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894} | ||||
@@ -18,12 +18,12 @@ namespace MQTTnet.Core.Tests | |||||
public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) | public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) | ||||
{ | { | ||||
await Task.FromResult(0); | |||||
await Task.FromResult(0).ConfigureAwait(false); | |||||
} | } | ||||
public async Task DisconnectAsync() | public async Task DisconnectAsync() | ||||
{ | { | ||||
await Task.FromResult(0); | |||||
await Task.FromResult(0).ConfigureAwait(false); | |||||
} | } | ||||
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) | public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) | ||||
@@ -31,14 +31,14 @@ namespace MQTTnet.Core.Tests | |||||
ThrowIfPartnerIsNull(); | ThrowIfPartnerIsNull(); | ||||
Partner.SendPacketInternal(packet); | Partner.SendPacketInternal(packet); | ||||
await Task.FromResult(0); | |||||
await Task.FromResult(0).ConfigureAwait(false); | |||||
} | } | ||||
public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) | public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout) | ||||
{ | { | ||||
ThrowIfPartnerIsNull(); | ThrowIfPartnerIsNull(); | ||||
return await Task.Run(() => _incomingPackets.Take()); | |||||
return await Task.Run(() => _incomingPackets.Take()).ConfigureAwait(false); | |||||
} | } | ||||
private void SendPacketInternal(MqttBasePacket packet) | private void SendPacketInternal(MqttBasePacket packet) | ||||
@@ -90,7 +90,7 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\..\Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj"> | <ProjectReference Include="..\..\Frameworks\MQTTnet.NetFramework\MQTTnet.NetFramework.csproj"> | ||||
<Project>{A480EF90-0EAA-4D9A-B271-47A9C47F6F7D}</Project> | |||||
<Project>{a480ef90-0eaa-4d9a-b271-47a9c47f6f7d}</Project> | |||||
<Name>MQTTnet.NetFramework</Name> | <Name>MQTTnet.NetFramework</Name> | ||||
</ProjectReference> | </ProjectReference> | ||||
<ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj"> | <ProjectReference Include="..\..\MQTTnet.Core\MQTTnet.Core.csproj"> | ||||
@@ -34,6 +34,7 @@ namespace MQTTnet.TestApp.NetFramework | |||||
private static async Task RunClientAsync(string[] arguments) | private static async Task RunClientAsync(string[] arguments) | ||||
{ | { | ||||
MqttTrace.TraceMessagePublished += (s, e) => | MqttTrace.TraceMessagePublished += (s, e) => | ||||
{ | { | ||||
Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | Console.WriteLine($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}"); | ||||