Browse Source

improved timeout after memory usage

release/3.x.x
Eggers Jan 7 years ago
parent
commit
b5b87f5272
2 changed files with 68 additions and 72 deletions
  1. +49
    -72
      MQTTnet.Core/Internal/TaskExtensions.cs
  2. +19
    -0
      Tests/MQTTnet.Core.Tests/ExtensionTests.cs

+ 49
- 72
MQTTnet.Core/Internal/TaskExtensions.cs View File

@@ -7,91 +7,68 @@ namespace MQTTnet.Core.Internal
{
public static class TaskExtensions
{
public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
public static async Task TimeoutAfter( this Task task, TimeSpan timeout )
{
var timeoutTask = Task.Delay(timeout);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (finishedTask == timeoutTask)
using ( var cancellationTokenSource = new CancellationTokenSource() )
{
throw new MqttCommunicationTimedOutException();
}
try
{
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (task.IsCanceled)
{
throw new TaskCanceledException();
}
if ( finishedTask == timeoutTask )
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception.GetBaseException());
}
if ( task.IsCanceled )
{
throw new TaskCanceledException();
}

////return TimeoutAfter(task.ContinueWith(t => 0), timeout);
if ( task.IsFaulted )
{
throw new MqttCommunicationException( task.Exception.GetBaseException() );
}
}
finally
{
cancellationTokenSource.Cancel();
}
}
}

public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout)
public static async Task<TResult> TimeoutAfter<TResult>( this Task<TResult> task, TimeSpan timeout )
{
var timeoutTask = Task.Delay(timeout);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

if (finishedTask == timeoutTask)
{
throw new MqttCommunicationTimedOutException();
}

if (task.IsCanceled)
using ( var cancellationTokenSource = new CancellationTokenSource() )
{
throw new TaskCanceledException();
}
if (task.IsFaulted)
{
throw new MqttCommunicationException(task.Exception.GetBaseException());
}
try
{
var timeoutTask = Task.Delay(timeout, cancellationTokenSource.Token);
var finishedTask = await Task.WhenAny(timeoutTask, task).ConfigureAwait(false);

return task.Result;
if ( finishedTask == timeoutTask )
{
throw new MqttCommunicationTimedOutException();
}

//// using (var cancellationTokenSource = new CancellationTokenSource())
//// {
//// var tcs = new TaskCompletionSource<TResult>();
if ( task.IsCanceled )
{
throw new TaskCanceledException();
}

//// cancellationTokenSource.Token.Register(() =>
//// {
//// tcs.TrySetCanceled();
//// });
if ( task.IsFaulted )
{
throw new MqttCommunicationException( task.Exception.GetBaseException() );
}

//// try
//// {
////#pragma warning disable 4014
//// task.ContinueWith(t =>
////#pragma warning restore 4014
//// {
//// if (t.IsFaulted)
//// {
//// tcs.TrySetException(t.Exception);
//// }

//// if (t.IsCompleted)
//// {
//// tcs.TrySetResult(t.Result);
//// }

//// return t.Result;
//// }, cancellationTokenSource.Token).ConfigureAwait(false);

//// cancellationTokenSource.CancelAfter(timeout);
//// return await tcs.Task.ConfigureAwait(false);
//// }
//// catch (TaskCanceledException)
//// {
//// throw new MqttCommunicationTimedOutException();
//// }
//// catch (Exception e)
//// {
//// throw new MqttCommunicationException(e);
//// }
//// }
return task.Result;
}
finally
{
cancellationTokenSource.Cancel();
}
}
}
}
}

+ 19
- 0
Tests/MQTTnet.Core.Tests/ExtensionTests.cs View File

@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Core.Exceptions;
@@ -67,5 +68,23 @@ namespace MQTTnet.Core.Tests
Assert.IsTrue(e.InnerException is IndexOutOfRangeException);
}
}

[TestMethod]
public async Task TimeoutAfterMemoryUsage()
{
var tasks = Enumerable.Range(0, 100000)
.Select(i => Task.Delay(TimeSpan.FromMilliseconds(1)).TimeoutAfter(TimeSpan.FromMinutes(1)));

await Task.WhenAll( tasks );
AssertIsLess( 3_000_000, GC.GetTotalMemory( true ) );
}

private void AssertIsLess( long bound, long actual )
{
if ( bound < actual )
{
Assert.Fail( $"value must be less than {bound:N0} but is {actual:N0}" );
}
}
}
}

Loading…
Cancel
Save