Przeglądaj źródła

Merge pull request #41 from JanEggers/develop

Develop
release/3.x.x
Christian 7 lat temu
committed by GitHub
rodzic
commit
b252781f8d
3 zmienionych plików z 75 dodań i 72 usunięć
  1. +49
    -72
      MQTTnet.Core/Internal/TaskExtensions.cs
  2. +7
    -0
      MQTTnet.sln.DotSettings
  3. +19
    -0
      Tests/MQTTnet.Core.Tests/ExtensionTests.cs

+ 49
- 72
MQTTnet.Core/Internal/TaskExtensions.cs Wyświetl plik

@@ -7,91 +7,68 @@ namespace MQTTnet.Core.Internal
{
public static class TaskExtensions
{
public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
public static async Task TimeoutAfter( this Task task, TimeSpan timeout )
{
var timeoutTask = Task.Delay(timeout);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (finishedTask == timeoutTask)
using ( var cancellationTokenSource = new CancellationTokenSource() )
{
throw new MqttCommunicationTimedOutException();
}
try
{
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (task.IsCanceled)
{
throw new TaskCanceledException();
}
if ( finishedTask == timeoutTask )
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception.GetBaseException());
}
if ( task.IsCanceled )
{
throw new TaskCanceledException();
}

////return TimeoutAfter(task.ContinueWith(t => 0), timeout);
if ( task.IsFaulted )
{
throw new MqttCommunicationException( task.Exception.GetBaseException() );
}
}
finally
{
cancellationTokenSource.Cancel();
}
}
}

public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout)
public static async Task<TResult> TimeoutAfter<TResult>( this Task<TResult> task, TimeSpan timeout )
{
var timeoutTask = Task.Delay(timeout);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (finishedTask == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsCanceled)
using ( var cancellationTokenSource = new CancellationTokenSource() )
{
throw new TaskCanceledException();
}
if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception.GetBaseException());
}
try
{
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

return task.Result;
if ( finishedTask == timeoutTask )
{
throw new MqttCommunicationTimedOutException();
}

//// using (var cancellationTokenSource = new CancellationTokenSource())
//// {
//// var tcs = new TaskCompletionSource<TResult>();
if ( task.IsCanceled )
{
throw new TaskCanceledException();
}

//// cancellationTokenSource.Token.Register(() =>
//// {
//// tcs.TrySetCanceled();
//// });
if ( task.IsFaulted )
{
throw new MqttCommunicationException( task.Exception.GetBaseException() );
}

//// try
//// {
////#pragma warning disable 4014
//// task.ContinueWith(t =>
////#pragma warning restore 4014
//// {
//// if (t.IsFaulted)
//// {
//// tcs.TrySetException(t.Exception);
//// }

//// if (t.IsCompleted)
//// {
//// tcs.TrySetResult(t.Result);
//// }

//// return t.Result;
//// }, cancellationTokenSource.Token).ConfigureAwait(false);

//// cancellationTokenSource.CancelAfter(timeout);
//// return await tcs.Task.ConfigureAwait(false);
//// }
//// catch (TaskCanceledException)
//// {
//// throw new MqttCommunicationTimedOutException();
//// }
//// catch (Exception e)
//// {
//// throw new MqttCommunicationException(e);
//// }
//// }
return task.Result;
}
finally
{
cancellationTokenSource.Cancel();
}
}
}
}
}

+ 7
- 0
MQTTnet.sln.DotSettings Wyświetl plik

@@ -0,0 +1,7 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_METHOD_CALL_PARENTHESES/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_METHOD_PARENTHESES/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/SPACE_WITHIN_PARENTHESES/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

+ 19
- 0
Tests/MQTTnet.Core.Tests/ExtensionTests.cs Wyświetl plik

@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Exceptions;
@@ -67,5 +68,23 @@ namespace MQTTnet.Core.Tests
Assert.IsTrue(e.InnerException is IndexOutOfRangeException);
}
}

[TestMethod]
public async Task TimeoutAfterMemoryUsage()
{
var tasks = Enumerable.Range(0, 100000)
.Select(i => Task.Delay(TimeSpan.FromMilliseconds(1)).TimeoutAfter(TimeSpan.FromMinutes(1)));

await Task.WhenAll( tasks );
AssertIsLess( 3_000_000, GC.GetTotalMemory( true ) );
}

private void AssertIsLess( long bound, long actual )
{
if ( bound < actual )
{
Assert.Fail( $"value must be less than {bound:N0} but is {actual:N0}" );
}
}
}
}

Ładowanie…
Anuluj
Zapisz