@@ -18,10 +18,16 @@ namespace MQTTnet.Implementations
{
{
readonly IMqttClientOptions _clientOptions;
readonly IMqttClientOptions _clientOptions;
readonly MqttClientTcpOptions _tcpOptions;
readonly MqttClientTcpOptions _tcpOptions;
readonly Action _disposeAction;
Stream _stream;
Stream _stream;
public MqttTcpChannel(IMqttClientOptions clientOptions)
public MqttTcpChannel()
{
_disposeAction = Dispose;
}
public MqttTcpChannel(IMqttClientOptions clientOptions) : this()
{
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
_tcpOptions = (MqttClientTcpOptions)clientOptions.ChannelOptions;
_tcpOptions = (MqttClientTcpOptions)clientOptions.ChannelOptions;
@@ -29,7 +35,7 @@ namespace MQTTnet.Implementations
IsSecureConnection = clientOptions.ChannelOptions?.TlsOptions?.UseTls == true;
IsSecureConnection = clientOptions.ChannelOptions?.TlsOptions?.UseTls == true;
}
}
public MqttTcpChannel(Stream stream, string endpoint, X509Certificate2 clientCertificate)
public MqttTcpChannel(Stream stream, string endpoint, X509Certificate2 clientCertificate) : this()
{
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
@@ -149,11 +155,15 @@ namespace MQTTnet.Implementations
return 0;
return 0;
}
}
#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
return await stream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
using (cancellationToken.Register(_disposeAction ))
{
{
return await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
return await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
#endif
}
}
catch (ObjectDisposedException)
catch (ObjectDisposedException)
{
{
@@ -177,18 +187,22 @@ namespace MQTTnet.Implementations
try
try
{
{
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
{
var stream = _stream;
var stream = _stream;
if (stream == null)
{
throw new MqttCommunicationException("The TCP connection is closed.");
}
if (stream == null)
{
throw new MqttCommunicationException("The TCP connection is closed.");
}
#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
await stream.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(_disposeAction))
{
await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
}
#endif
}
}
catch (ObjectDisposedException)
catch (ObjectDisposedException)
{
{