浏览代码

Fix stream and socket handling.

release/3.x.x
Christian 6 年前
父节点
当前提交
cea984cc2f
共有 10 个文件被更改,包括 171 次插入29 次删除
  1. +3
    -3
      Build/MQTTnet.AspNetCore.nuspec
  2. +7
    -6
      Build/MQTTnet.nuspec
  3. +2
    -0
      Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs
  4. +53
    -4
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs
  5. +84
    -13
      Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs
  6. +11
    -2
      Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs
  7. +1
    -0
      Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs
  8. +1
    -0
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs
  9. +8
    -0
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs
  10. +1
    -1
      Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs

+ 3
- 3
Build/MQTTnet.AspNetCore.nuspec 查看文件

@@ -2,7 +2,7 @@
<package >
<metadata>
<id>MQTTnet.AspNetCore</id>
<version>2.7.1</version>
<version>2.7.3</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,13 +10,13 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>This is a support library to integrate MQTTnet into AspNetCore.</description>
<releaseNotes>* Updated to MQTTnet 2.7.1.
<releaseNotes>* Updated to MQTTnet 2.7.3.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<group targetFramework="netstandard2.0">
<dependency id="MQTTnet" version="2.7.1" />
<dependency id="MQTTnet" version="2.7.3" />
</group>
</dependencies>
</metadata>


+ 7
- 6
Build/MQTTnet.nuspec 查看文件

@@ -2,7 +2,7 @@
<package >
<metadata>
<id>MQTTnet</id>
<version>2.7.2</version>
<version>2.7.3</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
@@ -10,11 +10,12 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Client] Added the subprotocol "mqtt" as default for web socket based connections.
* [Client] Added a new client setting called "KeepAliveSendInterval". It allows configuring the effective interval for sending ping requests.
* [Client] The client will no longer send ping requests if other packets are sent within the configured interval.
* [Server] The server now generates a valid packet identifier when disaptching publish packets to clients.
* [Core] Add several new extension methods.
<releaseNotes>* [Core] Add several new extension methods.
* [Client] Fixed an issue in _ManagedMqttClientOptionsBuilder_ when using _WithClientOptions_ and an options builder.
* [Client] Added the "IsStarted" property for the managed client.
* [Client] Optimized stream buffer for UWP apps.
* [Client] Added the _BufferSize_ to the TCP options.
* [Core] Fixed some issues in stream and socket handling.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>


+ 2
- 0
Frameworks/MQTTnet.NetStandard/Client/MqttClientTcpOptions.cs 查看文件

@@ -6,6 +6,8 @@

public int? Port { get; set; }

public int BufferSize { get; set; } = 20 * 4096;

public MqttClientTlsOptions TlsOptions { get; set; } = new MqttClientTlsOptions();
}
}

+ 53
- 4
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.Uwp.cs 查看文件

@@ -15,12 +15,19 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
{
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.

private readonly int _bufferSize = BufferSize;
private readonly MqttClientTcpOptions _options;

private StreamSocket _socket;

public MqttTcpChannel(MqttClientTcpOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_bufferSize = _options.BufferSize;
}

public MqttTcpChannel(StreamSocket socket)
@@ -69,14 +76,56 @@ namespace MQTTnet.Implementations

public void Dispose()
{
_socket?.Dispose();
_socket = null;
try
{
SendStream?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
SendStream = null;
}

try
{
ReceiveStream?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
ReceiveStream = null;
}

try
{
_socket?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
_socket = null;
}
}

private void CreateStreams()
{
SendStream = _socket.OutputStream.AsStreamForWrite();
ReceiveStream = _socket.InputStream.AsStreamForRead();
SendStream = _socket.OutputStream.AsStreamForWrite(_bufferSize);
ReceiveStream = _socket.InputStream.AsStreamForRead(_bufferSize);
}

private static Certificate LoadCertificate(MqttClientTcpOptions options)


+ 84
- 13
Frameworks/MQTTnet.NetStandard/Implementations/MqttTcpChannel.cs 查看文件

@@ -14,15 +14,19 @@ namespace MQTTnet.Implementations
{
public sealed class MqttTcpChannel : IMqttChannel, IDisposable
{
private readonly MqttClientTcpOptions _options;

//todo: this can be used with min dependency NetStandard1.6
#if NET452 || NET461
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.

private readonly int _bufferSize = BufferSize;
#else
private readonly int _bufferSize = 0;
#endif

private readonly MqttClientTcpOptions _options;

private Socket _socket;
private SslStream _sslStream;

@@ -32,6 +36,7 @@ namespace MQTTnet.Implementations
public MqttTcpChannel(MqttClientTcpOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_bufferSize = options.BufferSize;
}

/// <summary>
@@ -43,7 +48,7 @@ namespace MQTTnet.Implementations
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
_sslStream = sslStream;

CreateStreams(socket, sslStream);
CreateStreams();
}

public Stream SendStream { get; private set; }
@@ -71,7 +76,7 @@ namespace MQTTnet.Implementations
await _sslStream.AuthenticateAsClientAsync(_options.Server, LoadCertificates(_options), SslProtocols.Tls12, _options.TlsOptions.IgnoreCertificateRevocationErrors).ConfigureAwait(false);
}
CreateStreams(_socket, _sslStream);
CreateStreams();
}

public Task DisconnectAsync()
@@ -82,11 +87,70 @@ namespace MQTTnet.Implementations

public void Dispose()
{
_socket?.Dispose();
_socket = null;
var oneStreamIsUsed = SendStream != null && ReceiveStream != null && ReferenceEquals(SendStream, ReceiveStream);

try
{
SendStream?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
SendStream = null;
}

try
{
if (!oneStreamIsUsed)
{
ReceiveStream?.Dispose();
}
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
ReceiveStream = null;
}

try
{
_sslStream?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
_sslStream = null;
}

_sslStream?.Dispose();
_sslStream = null;
try
{
_socket?.Dispose();
}
catch (ObjectDisposedException)
{
}
catch (NullReferenceException)
{
}
finally
{
_socket = null;
}
}

private bool InternalUserCertificateValidationCallback(object sender, X509Certificate x509Certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
@@ -136,20 +200,27 @@ namespace MQTTnet.Implementations
return certificates;
}

private void CreateStreams(Socket socket, Stream sslStream)
private void CreateStreams()
{
var stream = sslStream ?? new NetworkStream(socket);
Stream stream;
if (_sslStream != null)
{
stream = _sslStream;
}
else
{
stream = new NetworkStream(_socket, true);
}
//todo: if branch can be used with min dependency NetStandard1.6
#if NET452 || NET461
SendStream = new BufferedStream(stream, BufferSize);
ReceiveStream = new BufferedStream(stream, BufferSize);
SendStream = new BufferedStream(stream, _bufferSize);
ReceiveStream = new BufferedStream(stream, _bufferSize);
#else
SendStream = stream;
ReceiveStream = stream;
#endif
}

}
}
#endif

+ 11
- 2
Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs 查看文件

@@ -96,8 +96,17 @@ namespace MQTTnet.Implementations

public void Dispose()
{
_webSocket?.Dispose();
_webSocket = null;
try
{
_webSocket?.Dispose();
}
catch (ObjectDisposedException)
{
}
finally
{
_webSocket = null;
}
}
}
}

+ 1
- 0
Frameworks/MQTTnet.NetStandard/ManagedClient/IManagedMqttClient.cs 查看文件

@@ -7,6 +7,7 @@ namespace MQTTnet.ManagedClient
{
public interface IManagedMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable
{
bool IsStarted { get; }
bool IsConnected { get; }

event EventHandler<MqttClientConnectedEventArgs> Connected;


+ 1
- 0
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClient.cs 查看文件

@@ -40,6 +40,7 @@ namespace MQTTnet.ManagedClient
}

public bool IsConnected => _mqttClient.IsConnected;
public bool IsStarted => _connectionCancellationToken != null;

public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;


+ 8
- 0
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientExtensions.cs 查看文件

@@ -21,6 +21,14 @@ namespace MQTTnet.ManagedClient
return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qualityOfServiceLevel).Build());
}

public static Task SubscribeAsync(this IManagedMqttClient managedClient, string topic)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return managedClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
}

public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, params string[] topicFilters)
{
if (managedClient == null) throw new ArgumentNullException(nameof(managedClient));


+ 1
- 1
Frameworks/MQTTnet.NetStandard/ManagedClient/ManagedMqttClientOptionsBuilder.cs 查看文件

@@ -47,7 +47,7 @@ namespace MQTTnet.ManagedClient
{
if (options == null) throw new ArgumentNullException(nameof(options));

if (_clientOptionsBuilder != null)
if (_clientOptionsBuilder == null)
{
_clientOptionsBuilder = new MqttClientOptionsBuilder();
}


正在加载...
取消
保存