ソースを参照

Add cancellation token support to Subscribers (#912)

* Add cancellation token support to Subscribers

* Fix for concurrent tests

* Working on additional tests for unique method cache key and dispatcher rework

* Makes Dispatcher a processing server
master
Matt Psaltis 3年前
committed by GitHub
コミット
2069014720
この署名に対応する既知のキーがデータベースに存在しません GPGキーID: 4AEE18F83AFDEB23
38個のファイルの変更781行の追加70行の削除
  1. +7
    -0
      CAP.sln
  2. +1
    -1
      samples/Sample.ConsoleApp/Program.cs
  3. +8
    -7
      src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.Consul.cs
  4. +3
    -2
      src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.cs
  5. +3
    -2
      src/DotNetCore.CAP.Dashboard/NodeDiscovery/IProcessingServer.Consul.cs
  6. +6
    -1
      src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs
  7. +3
    -2
      src/DotNetCore.CAP.SqlServer/Diagnostics/IProcessingServer.DiagnosticRegister.cs
  8. +7
    -7
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  9. +28
    -6
      src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs
  10. +1
    -2
      src/DotNetCore.CAP/Internal/IBootstrapper.cs
  11. +13
    -3
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  12. +4
    -1
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
  13. +2
    -1
      src/DotNetCore.CAP/Internal/IProcessingServer.cs
  14. +28
    -9
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  15. +28
    -18
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  16. +3
    -1
      src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
  17. +1
    -1
      src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
  18. +1
    -1
      src/DotNetCore.CAP/Transport/IDispatcher.cs
  19. +7
    -0
      test/DotNetCore.CAP.MultiModuleSubscriberTests/DotNetCore.CAP.MultiModuleSubscriberTests.csproj
  20. +11
    -0
      test/DotNetCore.CAP.MultiModuleSubscriberTests/SubscriberClass.cs
  21. +53
    -4
      test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
  22. +2
    -0
      test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
  23. +2
    -0
      test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs
  24. +64
    -0
      test/DotNetCore.CAP.Test/Helpers/ObservableCollectionExtensions.cs
  25. +27
    -0
      test/DotNetCore.CAP.Test/Helpers/TestBootstrapService.cs
  26. +25
    -0
      test/DotNetCore.CAP.Test/Helpers/TestHostedServiceExtensions.cs
  27. +48
    -0
      test/DotNetCore.CAP.Test/Helpers/TestLogger.cs
  28. +13
    -0
      test/DotNetCore.CAP.Test/Helpers/TestLoggingExtensions.cs
  29. +24
    -0
      test/DotNetCore.CAP.Test/Helpers/TestLoggingProvider.cs
  30. +19
    -0
      test/DotNetCore.CAP.Test/Helpers/TestMessageCollector.cs
  31. +32
    -0
      test/DotNetCore.CAP.Test/Helpers/TestServiceCollectionExtensions.cs
  32. +63
    -0
      test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs
  33. +41
    -0
      test/DotNetCore.CAP.Test/IntegrationTests/IntegrationTestBase.cs
  34. +1
    -1
      test/DotNetCore.CAP.Test/MessageTest.cs
  35. +68
    -0
      test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs
  36. +64
    -0
      test/DotNetCore.CAP.Test/SubscriberCollisionTests/NewMethodTests.cs
  37. +61
    -0
      test/DotNetCore.CAP.Test/SubscriberCollisionTests/OldMethodTests.cs
  38. +9
    -0
      test/DotNetCore.CAP.Test/SubscriberCollisionTests/SubscriberClass.cs

+ 7
- 0
CAP.sln ファイルの表示

@@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Dashboard.Auth", "samples\Sample.Dashboard.Auth\Sample.Dashboard.Auth.csproj", "{6E059983-DE89-4D53-88F5-D9083BCE257F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MultiModuleSubscriberTests", "test\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj", "{23684403-7DA8-489A-8A1E-8056D7683E18}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -178,6 +180,10 @@ Global
{6E059983-DE89-4D53-88F5-D9083BCE257F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E059983-DE89-4D53-88F5-D9083BCE257F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E059983-DE89-4D53-88F5-D9083BCE257F}.Release|Any CPU.Build.0 = Release|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -207,6 +213,7 @@ Global
{375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{54458B54-49CC-454C-82B2-4AED681D9D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{6E059983-DE89-4D53-88F5-D9083BCE257F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{23684403-7DA8-489A-8A1E-8056D7683E18} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 1
- 1
samples/Sample.ConsoleApp/Program.cs ファイルの表示

@@ -29,7 +29,7 @@ namespace Sample.ConsoleApp

var sp = container.BuildServiceProvider();

sp.GetService<IBootstrapper>().BootstrapAsync(default);
sp.GetService<IBootstrapper>().BootstrapAsync();

Console.ReadLine();
}


+ 8
- 7
src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.Consul.cs ファイルの表示

@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using Microsoft.Extensions.Logging;
@@ -21,7 +22,7 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
_options = options;
}

public IList<Node> GetNodes()
public IList<Node> GetNodes(CancellationToken cancellationToken)
{
try
{
@@ -33,11 +34,11 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
config.Address = new Uri($"http://{_options.DiscoveryServerHostName}:{_options.DiscoveryServerPort}");
});

var services = consul.Catalog.Services().GetAwaiter().GetResult();
var services = consul.Catalog.Services(cancellationToken).GetAwaiter().GetResult();

foreach (var service in services.Response)
{
var serviceInfo = consul.Catalog.Service(service.Key).GetAwaiter().GetResult();
var serviceInfo = consul.Catalog.Service(service.Key, cancellationToken).GetAwaiter().GetResult();
var node = serviceInfo.Response.SkipWhile(x => !x.ServiceTags.Contains("CAP"))
.Select(info => new Node
{
@@ -65,7 +66,7 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
}
}

public async Task RegisterNode()
public async Task RegisterNode(CancellationToken cancellationToken)
{
try
{
@@ -84,7 +85,7 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
var tags = new[] { "CAP", "Client", "Dashboard" };
if (_options.CustomTags != null && _options.CustomTags.Length > 0)
{
tags = tags.Union(this._options.CustomTags).ToArray();
tags = tags.Union(_options.CustomTags).ToArray();
}

using var consul = new ConsulClient(config =>
@@ -101,11 +102,11 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
Port = _options.CurrentNodePort,
Tags = tags,
Check = healthCheck
});
}, cancellationToken);

if (result.StatusCode == System.Net.HttpStatusCode.OK)
{
_logger.LogInformation("Consul node registe success!");
_logger.LogInformation("Consul node register success!");
}
}
catch (Exception ex)


+ 3
- 2
src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.cs ファイルの表示

@@ -2,14 +2,15 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Dashboard.NodeDiscovery
{
public interface INodeDiscoveryProvider
{
IList<Node> GetNodes();
IList<Node> GetNodes(CancellationToken cancellationToken = default);

Task RegisterNode();
Task RegisterNode(CancellationToken cancellationToken = default);
}
}

+ 3
- 2
src/DotNetCore.CAP.Dashboard/NodeDiscovery/IProcessingServer.Consul.cs ファイルの表示

@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using DotNetCore.CAP.Internal;

namespace DotNetCore.CAP.Dashboard.NodeDiscovery
@@ -14,9 +15,9 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
_discoveryProvider = discoveryProvider;
}

public void Start()
public void Start(CancellationToken stoppingToken)
{
_discoveryProvider.RegisterNode().GetAwaiter().GetResult();
_discoveryProvider.RegisterNode(stoppingToken).GetAwaiter().GetResult();
}

public void Pulse()


+ 6
- 1
src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs ファイルの表示

@@ -125,11 +125,16 @@ namespace DotNetCore.CAP.RedisStreams

return (true, await readSet.ConfigureAwait(false));
}
catch (OperationCanceledException)
{
// ignore
}
catch (Exception ex)
{
_logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}");
return (false, Array.Empty<RedisStream>());
}

return (false, Array.Empty<RedisStream>());
}

private async Task ConnectAsync()


+ 3
- 2
src/DotNetCore.CAP.SqlServer/Diagnostics/IProcessingServer.DiagnosticRegister.cs ファイルの表示

@@ -1,7 +1,8 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Diagnostics;
using System.Threading;
using DotNetCore.CAP.Internal;

namespace DotNetCore.CAP.SqlServer.Diagnostics
@@ -25,7 +26,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics
}

public void Start()
public void Start(CancellationToken stoppingToken)
{
DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver);
}


+ 7
- 7
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs ファイルの表示

@@ -17,8 +17,6 @@ namespace Microsoft.Extensions.DependencyInjection
/// </summary>
public static class ServiceCollectionExtensions
{
internal static IServiceCollection ServiceCollection;

/// <summary>
/// Adds and configures the consistence services for the consistency.
/// </summary>
@@ -32,8 +30,7 @@ namespace Microsoft.Extensions.DependencyInjection
throw new ArgumentNullException(nameof(setupAction));
}

ServiceCollection = services;

services.AddSingleton(_ => services);
services.TryAddSingleton<CapMarkerService>();

services.TryAddSingleton<ICapPublisher, CapPublisher>();
@@ -47,13 +44,15 @@ namespace Microsoft.Extensions.DependencyInjection
//Processors
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerRegister>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, IDispatcher>(sp =>
sp.GetRequiredService<IDispatcher>()));

//Queue's message processor
services.TryAddSingleton<MessageNeedToRetryProcessor>();
services.TryAddSingleton<TransportCheckProcessor>();
services.TryAddSingleton<CollectorProcessor>();

//Sender and Executors
//Sender and Executors
services.TryAddSingleton<IMessageSender, MessageSender>();
services.TryAddSingleton<IDispatcher, Dispatcher>();

@@ -72,8 +71,9 @@ namespace Microsoft.Extensions.DependencyInjection
services.Configure(setupAction);

//Startup and Hosted
services.AddSingleton<IBootstrapper, Bootstrapper>();
services.AddHostedService<Bootstrapper>();
services.AddSingleton<Bootstrapper>();
services.AddHostedService(sp => sp.GetRequiredService<Bootstrapper>());
services.AddSingleton<IBootstrapper>(sp => sp.GetRequiredService<Bootstrapper>());

return new CapBuilder(services);
}


+ 28
- 6
src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs ファイルの表示

@@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

@@ -17,6 +18,7 @@ namespace DotNetCore.CAP.Internal
internal class Bootstrapper : BackgroundService, IBootstrapper
{
private readonly ILogger<Bootstrapper> _logger;
private CancellationTokenSource _cts = new CancellationTokenSource();

public Bootstrapper(
ILogger<Bootstrapper> logger,
@@ -32,23 +34,23 @@ namespace DotNetCore.CAP.Internal

private IEnumerable<IProcessingServer> Processors { get; }

public async Task BootstrapAsync(CancellationToken stoppingToken)
public async Task BootstrapAsync()
{
_logger.LogDebug("### CAP background task is starting.");

try
{
await Storage.InitializeAsync(stoppingToken);
await Storage.InitializeAsync(_cts.Token);
}
catch (Exception e)
{
_logger.LogError(e, "Initializing the storage structure failed!");
}

stoppingToken.Register(() =>
_cts.Token.Register(() =>
{
_logger.LogDebug("### CAP background task is stopping.");
foreach (var item in Processors)
{
try
@@ -71,9 +73,15 @@ namespace DotNetCore.CAP.Internal
{
foreach (var item in Processors)
{
_cts.Token.ThrowIfCancellationRequested();

try
{
item.Start();
item.Start(_cts.Token);
}
catch (OperationCanceledException)
{
// ignore
}
catch (Exception ex)
{
@@ -84,9 +92,23 @@ namespace DotNetCore.CAP.Internal
return Task.CompletedTask;
}

public override void Dispose()
{
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await BootstrapAsync(stoppingToken);
await BootstrapAsync();
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
_cts?.Cancel();
await base.StopAsync(cancellationToken);
}
}
}

+ 1
- 2
src/DotNetCore.CAP/Internal/IBootstrapper.cs ファイルの表示

@@ -1,7 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Internal
@@ -11,6 +10,6 @@ namespace DotNetCore.CAP.Internal
/// </summary>
public interface IBootstrapper
{
Task BootstrapAsync(CancellationToken stoppingToken);
Task BootstrapAsync();
}
}

+ 13
- 3
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs ファイルの表示

@@ -41,7 +41,7 @@ namespace DotNetCore.CAP.Internal
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
public ConsumerRegister(ILogger<ConsumerRegister> logger, IServiceProvider serviceProvider)
{
_logger = logger;
@@ -61,7 +61,14 @@ namespace DotNetCore.CAP.Internal
return _isHealthy;
}

public void Start()
public void Start(CancellationToken stoppingToken)
{
stoppingToken.Register(() => _cts?.Cancel());

Execute();
}

public void Execute()
{
var groupingMatches = _selector.GetCandidatesMethodsOfGroupNameGrouped();

@@ -116,10 +123,11 @@ namespace DotNetCore.CAP.Internal
{
Pulse();

_cts.Dispose();
_cts = new CancellationTokenSource();
_isHealthy = true;

Start();
Execute();
}
}

@@ -151,6 +159,8 @@ namespace DotNetCore.CAP.Internal
public void Pulse()
{
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
}

private void RegisterMessageProcessor(IConsumerClient client)


+ 4
- 1
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs ファイルの表示

@@ -7,6 +7,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text.RegularExpressions;
using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

@@ -74,8 +75,9 @@ namespace DotNetCore.CAP.Internal
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();

var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo();
var serviceCollection = provider.GetRequiredService<IServiceCollection>();

foreach (var service in ServiceCollectionExtensions.ServiceCollection
foreach (var service in serviceCollection
.Where(o => o.ImplementationType != null || o.ImplementationFactory != null))
{
var detectType = service.ImplementationType ?? service.ServiceType;
@@ -147,6 +149,7 @@ namespace DotNetCore.CAP.Internal
Name = parameter.Name,
ParameterType = parameter.ParameterType,
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
|| typeof(CancellationToken).IsAssignableFrom(parameter.ParameterType)
}).ToList();

yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters, topicClassAttribute);


+ 2
- 1
src/DotNetCore.CAP/Internal/IProcessingServer.cs ファイルの表示

@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;

namespace DotNetCore.CAP.Internal
{
@@ -13,6 +14,6 @@ namespace DotNetCore.CAP.Internal
{
void Pulse() { }

void Start();
void Start(CancellationToken stoppingToken);
}
}

+ 28
- 9
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs ファイルの表示

@@ -36,12 +36,14 @@ namespace DotNetCore.CAP.Internal
cancellationToken.ThrowIfCancellationRequested();

var methodInfo = context.ConsumerDescriptor.MethodInfo;
var reflectedType = methodInfo.ReflectedType.Name;
var reflectedTypeHandle = methodInfo.ReflectedType.TypeHandle.Value;
var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}";

_logger.LogDebug("Executing subscriber method : {0}", methodInfo.Name);

var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}";
var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
var executor = _executors.GetOrAdd(key,
x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));

using var scope = _serviceProvider.CreateScope();

@@ -54,9 +56,10 @@ namespace DotNetCore.CAP.Internal
var executeParameters = new object[parameterDescriptors.Count];
for (var i = 0; i < parameterDescriptors.Count; i++)
{
if (parameterDescriptors[i].IsFromCap)
var parameterDescriptor = parameterDescriptors[i];
if (parameterDescriptor.IsFromCap)
{
executeParameters[i] = new CapHeader(message.Headers);
executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken);
}
else
{
@@ -64,24 +67,24 @@ namespace DotNetCore.CAP.Internal
{
if (_serializer.IsJsonType(message.Value)) // use ISerializer when reading from storage, skip other objects if not Json
{
executeParameters[i] = _serializer.Deserialize(message.Value, parameterDescriptors[i].ParameterType);
executeParameters[i] = _serializer.Deserialize(message.Value, parameterDescriptor.ParameterType);
}
else
{
var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType);
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(message.Value.GetType()))
{
executeParameters[i] = converter.ConvertFrom(message.Value);
}
else
{
if (parameterDescriptors[i].ParameterType.IsInstanceOfType(message.Value))
if (parameterDescriptor.ParameterType.IsInstanceOfType(message.Value))
{
executeParameters[i] = message.Value;
}
else
{
executeParameters[i] = Convert.ChangeType(message.Value, parameterDescriptors[i].ParameterType);
executeParameters[i] = Convert.ChangeType(message.Value, parameterDescriptor.ParameterType);
}
}
}
@@ -134,6 +137,22 @@ namespace DotNetCore.CAP.Internal
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}

private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
CancellationToken cancellationToken)
{
if (typeof(CancellationToken).IsAssignableFrom(parameterDescriptor.ParameterType))
{
return cancellationToken;
}

if (parameterDescriptor.ParameterType.IsAssignableFrom(typeof(CapHeader)))
{
return new CapHeader(message.Headers);
}

throw new ArgumentException(parameterDescriptor.Name);
}

protected virtual object GetInstance(IServiceProvider provider, ConsumerContext context)
{
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();


+ 28
- 18
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs ファイルの表示

@@ -15,15 +15,17 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
{
public class Dispatcher : IDispatcher, IDisposable
public class Dispatcher : IDispatcher
{
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly IMessageSender _sender;
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;

private readonly Channel<MediumMessage> _publishedChannel;
private readonly Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;
private Channel<MediumMessage> _publishedChannel;
private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;

private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
@@ -32,16 +34,23 @@ namespace DotNetCore.CAP.Processor
{
_logger = logger;
_sender = sender;
_options = options.Value;
_executor = executor;
}

public void Start(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
stoppingToken.Register(() => _cts.Cancel());

_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true });
_receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>();

Task.WhenAll(Enumerable.Range(0, options.Value.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

Task.WhenAll(Enumerable.Range(0, options.Value.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Processing(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
}

public void EnqueueToPublish(MediumMessage message)
@@ -54,16 +63,11 @@ namespace DotNetCore.CAP.Processor
_receivedChannel.Writer.TryWrite((message, descriptor));
}

public void Dispose()
{
_cts.Cancel();
}

private async Task Sending()
private async Task Sending(CancellationToken cancellationToken)
{
try
{
while (await _publishedChannel.Reader.WaitToReadAsync(_cts.Token))
while (await _publishedChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (_publishedChannel.Reader.TryRead(out var message))
{
@@ -90,17 +94,17 @@ namespace DotNetCore.CAP.Processor
}
}

private async Task Processing()
private async Task Processing(CancellationToken cancellationToken)
{
try
{
while (await _receivedChannel.Reader.WaitToReadAsync(_cts.Token))
while (await _receivedChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (_receivedChannel.Reader.TryRead(out var message))
{
try
{
await _executor.DispatchAsync(message.Item1, message.Item2, _cts.Token);
await _executor.DispatchAsync(message.Item1, message.Item2, cancellationToken);
}
catch (Exception e)
{
@@ -115,5 +119,11 @@ namespace DotNetCore.CAP.Processor
// expected
}
}

public void Dispose()
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
}
}
}

+ 3
- 1
src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs ファイルの表示

@@ -34,8 +34,10 @@ namespace DotNetCore.CAP.Processor
_cts = new CancellationTokenSource();
}

public void Start()
public void Start(CancellationToken stoppingToken)
{
stoppingToken.Register(() => _cts.Cancel());

_logger.ServerStarting();

_context = new ProcessingContext(_provider, _cts.Token);


+ 1
- 1
src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs ファイルの表示

@@ -70,7 +70,7 @@ namespace DotNetCore.CAP.Processor

foreach (var message in messages)
{
await _subscribeDispatcher.DispatchAsync(message);
await _subscribeDispatcher.DispatchAsync(message, context.CancellationToken);

await context.WaitAsync(_delay);
}


+ 1
- 1
src/DotNetCore.CAP/Transport/IDispatcher.cs ファイルの表示

@@ -6,7 +6,7 @@ using DotNetCore.CAP.Persistence;

namespace DotNetCore.CAP.Transport
{
public interface IDispatcher
public interface IDispatcher : IProcessingServer
{
void EnqueueToPublish(MediumMessage message);



+ 7
- 0
test/DotNetCore.CAP.MultiModuleSubscriberTests/DotNetCore.CAP.MultiModuleSubscriberTests.csproj ファイルの表示

@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

</Project>

+ 11
- 0
test/DotNetCore.CAP.MultiModuleSubscriberTests/SubscriberClass.cs ファイルの表示

@@ -0,0 +1,11 @@
using System;

namespace DotNetCore.CAP.MultiModuleSubscriberTests
{
public class SubscriberClass
{
public void TestSubscriber()
{
}
}
}

+ 53
- 4
test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs ファイルの表示

@@ -1,4 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
@@ -12,14 +15,15 @@ namespace DotNetCore.CAP.Test

public ConsumerServiceSelectorTest()
{
var services = new ServiceCollection();
ServiceCollectionExtensions.ServiceCollection = services;
var services = new ServiceCollection();
services.AddOptions();
services.PostConfigure<CapOptions>(x=>{});
services.AddSingleton<IServiceCollection>(_ => services);
services.AddSingleton<IConsumerServiceSelector, ConsumerServiceSelector>();
services.AddScoped<IFooTest, CandidatesFooTest>();
services.AddScoped<IBarTest, CandidatesBarTest>();
services.AddScoped<IAbstractTest, CandidatesAbstractTest>();
services.AddScoped<ICancellationTest, CancellationTokenTest>();
services.AddLogging();
_provider = services.BuildServiceProvider();
}
@@ -29,8 +33,8 @@ namespace DotNetCore.CAP.Test
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
Assert.Equal(10, candidates.Count);
Assert.Equal(13, candidates.Count);
}

[Theory]
@@ -130,6 +134,22 @@ namespace DotNetCore.CAP.Test
var bestCandidates = selector.SelectBestCandidate(topic, candidates);
Assert.Null(bestCandidates);
}

[Theory]
[InlineData("CancellationToken.NoAdditionalParameters", 1, 1)]
[InlineData("CancellationToken.OneAdditionalParameter", 2, 2)]
[InlineData("CancellationToken.CommonArrangement", 3, 2)]
public void CanFindTopicSuccessfully(string topic, int parameterCount, int fromCapCount)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();

var bestCandidate = selector.SelectBestCandidate(topic, candidates);
Assert.NotEqual(0, candidates.Count);
Assert.NotNull(bestCandidate);
Assert.Equal(bestCandidate.Parameters.Count, parameterCount);
Assert.Equal(fromCapCount, bestCandidate.Parameters.Count(x => x.IsFromCap));
}
}

public class CandidatesTopic : TopicAttribute
@@ -151,6 +171,10 @@ namespace DotNetCore.CAP.Test
{
}

public interface ICancellationTest
{
}

[CandidatesTopic("Candidates")]
public class CandidatesFooTest : IFooTest, ICapSubscribe
{
@@ -244,4 +268,29 @@ namespace DotNetCore.CAP.Test
public class CandidatesAbstractTest : CandidatesAbstractBaseTest
{
}
[CandidatesTopic("Candidates")]
public class CancellationTokenTest : ICancellationTest, ICapSubscribe
{
[CandidatesTopic("CancellationToken.NoAdditionalParameters")]
public Task NoAdditionalParameters(CancellationToken cancellationToken)
{
Console.WriteLine($"{nameof(NoAdditionalParameters)} method has been executed.");
return Task.CompletedTask;
}

[CandidatesTopic("CancellationToken.OneAdditionalParameter")]
public Task OneAdditionalParameter([FromCap] CapHeader headers, CancellationToken cancellationToken)
{
Console.WriteLine($"{nameof(OneAdditionalParameter)} method has been executed.");
return Task.CompletedTask;
}

[CandidatesTopic("CancellationToken.CommonArrangement")]
public Task CommonArrangement(DateTime date, [FromCap] IDictionary<string, string> headers, CancellationToken cancellationToken)
{
Console.WriteLine($"{nameof(CommonArrangement)} method has been executed.");
return Task.CompletedTask;
}
}
}

+ 2
- 0
test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj ファイルの表示

@@ -27,7 +27,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
<ProjectReference Include="..\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj" />
</ItemGroup>

</Project>

+ 2
- 0
test/DotNetCore.CAP.Test/FakeInMemoryQueue/InMemoryConsumerClient.cs ファイルの表示

@@ -68,6 +68,8 @@ namespace DotNetCore.CAP.Test.FakeInMemoryQueue

private void OnConsumerReceived(TransportMessage e)
{
var headers = e.Headers;
headers.TryAdd(Messages.Headers.Group, _subscriptionName);
OnMessageReceived?.Invoke(null, e);
}
#endregion private methods


+ 64
- 0
test/DotNetCore.CAP.Test/Helpers/ObservableCollectionExtensions.cs ファイルの表示

@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace DotNetCore.CAP.Test.Helpers
{
public static class ObservableCollectionExtensions
{
public static async Task WaitOneMessage<T>(this ObservableCollection<T> collection,
CancellationToken cancellationToken)
{
await WaitForMessages(collection,
x => x.Count() == 1,
cancellationToken);
}

public static async Task WaitForMessages<T>(this ObservableCollection<T> collection,
Func<IEnumerable<T>, bool> comparison,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var cts = new CancellationTokenSource();
cancellationToken.Register(() => cts.Cancel());

await Task.Run(async () =>
{
void OnCollectionChanged(object sender, NotifyCollectionChangedEventArgs args)
{
if (comparison(collection))
{
cts.Cancel();
}
}

collection.CollectionChanged += OnCollectionChanged;
if (collection.Count > 0)
{
OnCollectionChanged(collection, default);
}

try
{
await Task.Delay(-1, cts.Token);
}
catch (TaskCanceledException)
{
}
finally
{
collection.CollectionChanged -= OnCollectionChanged;
}

cancellationToken.ThrowIfCancellationRequested();

}, cancellationToken);
}
}
}

+ 27
- 0
test/DotNetCore.CAP.Test/Helpers/TestBootstrapService.cs ファイルの表示

@@ -0,0 +1,27 @@
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Hosting;

namespace DotNetCore.CAP.Test.Helpers
{
public class TestBootstrapService : IHostedService
{
private readonly IBootstrapper _bootstrapper;

public TestBootstrapService(IBootstrapper bootstrapper)
{
_bootstrapper = bootstrapper;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _bootstrapper.BootstrapAsync();
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 25
- 0
test/DotNetCore.CAP.Test/Helpers/TestHostedServiceExtensions.cs ファイルの表示

@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace DotNetCore.CAP.Test.Helpers
{
public static class TestHostedServiceExtensions
{
public static void StartHostedServices(this IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
Task.Run(async () =>
{
var hostedServices = serviceProvider.GetRequiredService<IEnumerable<IHostedService>>();
foreach (var hostedService in hostedServices)
{
await hostedService.StartAsync(cancellationToken);
}
}, cancellationToken)
.GetAwaiter().GetResult();
}
}
}

+ 48
- 0
test/DotNetCore.CAP.Test/Helpers/TestLogger.cs ファイルの表示

@@ -0,0 +1,48 @@
using System;
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.Helpers
{
public class TestLogger : ILogger
{
private readonly ITestOutputHelper _outputHelper;

public TestLogger(ITestOutputHelper outputHelper, string categoryName)
{
_outputHelper = outputHelper;
CategoryName = categoryName;
}

public string CategoryName { get; }

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
_outputHelper.WriteLine($"[{logLevel}] {formatter.Invoke(state, exception)}");
}

public bool IsEnabled(LogLevel logLevel)
{
return true;
}

public IDisposable BeginScope<TState>(TState state)
{
return new DisposableAction(state);
}

private class DisposableAction : IDisposable
{
private readonly object _state;

public DisposableAction(object state)
{
_state = state;
}

public void Dispose()
{
}
}
}
}

+ 13
- 0
test/DotNetCore.CAP.Test/Helpers/TestLoggingExtensions.cs ファイルの表示

@@ -0,0 +1,13 @@
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.Helpers
{
public static class TestLoggingExtensions
{
public static void AddTestLogging(this ILoggingBuilder builder, ITestOutputHelper outputHelper)
{
builder.AddProvider(new TestLoggingProvider(outputHelper));
}
}
}

+ 24
- 0
test/DotNetCore.CAP.Test/Helpers/TestLoggingProvider.cs ファイルの表示

@@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.Helpers
{
public class TestLoggingProvider : ILoggerProvider
{
private readonly ITestOutputHelper _outputHelper;

public TestLoggingProvider(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}

public void Dispose()
{
}

public ILogger CreateLogger(string categoryName)
{
return new TestLogger(_outputHelper, categoryName);
}
}
}

+ 19
- 0
test/DotNetCore.CAP.Test/Helpers/TestMessageCollector.cs ファイルの表示

@@ -0,0 +1,19 @@
using System.Collections.Generic;

namespace DotNetCore.CAP.Test.Helpers
{
public class TestMessageCollector
{
private readonly ICollection<object> _handledMessages;

public TestMessageCollector(ICollection<object> handledMessages)
{
_handledMessages = handledMessages;
}

public void Add(object data)
{
_handledMessages.Add(data);
}
}
}

+ 32
- 0
test/DotNetCore.CAP.Test/Helpers/TestServiceCollectionExtensions.cs ファイルの表示

@@ -0,0 +1,32 @@
using System;
using System.Threading;
using DotNetCore.CAP.Test.FakeInMemoryQueue;
using Microsoft.Extensions.DependencyInjection;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.Helpers
{
public static class TestServiceCollectionExtensions
{
public const string TestGroupName = "Test";

public static void AddTestSetup(this IServiceCollection services, ITestOutputHelper testOutput)
{
services.AddLogging(x => x.AddTestLogging(testOutput));
services.AddCap(x =>
{
x.DefaultGroupName = TestGroupName;
x.UseFakeTransport();
x.UseInMemoryStorage();
});
services.AddHostedService<TestBootstrapService>();
}

public static ServiceProvider BuildTestContainer(this IServiceCollection services, CancellationToken cancellationToken)
{
var container = services.BuildServiceProvider();
container.StartHostedServices(cancellationToken);
return container;
}
}
}

+ 63
- 0
test/DotNetCore.CAP.Test/IntegrationTests/CancellationTokenSubscriberTest.cs ファイルの表示

@@ -0,0 +1,63 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Test.Helpers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.IntegrationTests
{
public class CancellationTokenSubscriberTest : IntegrationTestBase
{
public CancellationTokenSubscriberTest(ITestOutputHelper testOutput)
: base(testOutput)
{
}

[Fact]
public async Task Execute()
{
await Publisher.PublishAsync(nameof(CancellationTokenSubscriberTest), "Test Message");
await HandledMessages.WaitOneMessage(CancellationToken);

// Explicitly stop Bootstrapper to prove the cancellation token works.
var bootstrapper = Container.GetRequiredService<Bootstrapper>();
await bootstrapper.StopAsync(CancellationToken.None);

var (message, token) = HandledMessages
.OfType<(string Message, CancellationToken Token)>()
.Single();

Assert.Equal("Test Message", message);
Assert.True(token.IsCancellationRequested);
}

protected override void ConfigureServices(IServiceCollection services)
{
services.AddTransient<TestEventSubscriber>();
}

private class TestEventSubscriber : ICapSubscribe
{
private readonly TestMessageCollector _collector;
private readonly ILogger<TestEventSubscriber> _logger;

public TestEventSubscriber(ILogger<TestEventSubscriber> logger, TestMessageCollector collector)
{
_logger = logger;
_collector = collector;
}

[CapSubscribe(nameof(CancellationTokenSubscriberTest),
Group = TestServiceCollectionExtensions.TestGroupName)]
public void Handle(string message, CancellationToken cancellationToken)
{
_logger.LogWarning($"{nameof(Handle)} method called. {message}");
_collector.Add((message, cancellationToken));
}
}
}
}

+ 41
- 0
test/DotNetCore.CAP.Test/IntegrationTests/IntegrationTestBase.cs ファイルの表示

@@ -0,0 +1,41 @@
using System;
using System.Collections.ObjectModel;
using System.Threading;
using DotNetCore.CAP.Test.Helpers;
using Microsoft.Extensions.DependencyInjection;
using Xunit.Abstractions;

namespace DotNetCore.CAP.Test.IntegrationTests
{
public abstract class IntegrationTestBase : IDisposable
{
protected readonly CancellationTokenSource CancellationTokenSource = new(TimeSpan.FromSeconds(10));
protected readonly ServiceProvider Container;
protected readonly ObservableCollection<object> HandledMessages = new();
protected readonly ICapPublisher Publisher;

protected IntegrationTestBase(ITestOutputHelper testOutput)
{
var services = new ServiceCollection();
services.AddTestSetup(testOutput);
services.AddSingleton(sp => new TestMessageCollector(HandledMessages));
ConfigureServices(services);

Container = services.BuildTestContainer(CancellationToken);
Scope = Container.CreateScope();
Publisher = Scope.ServiceProvider.GetRequiredService<ICapPublisher>();
}

protected IServiceScope Scope { get; }

protected CancellationToken CancellationToken => CancellationTokenSource.Token;

public void Dispose()
{
Scope?.Dispose();
Container?.Dispose();
}

protected abstract void ConfigureServices(IServiceCollection services);
}
}

+ 1
- 1
test/DotNetCore.CAP.Test/MessageTest.cs ファイルの表示

@@ -14,9 +14,9 @@ namespace DotNetCore.CAP.Test
public MessageTest()
{
var services = new ServiceCollection();
ServiceCollectionExtensions.ServiceCollection = services;
services.AddOptions();
services.AddSingleton<IServiceCollection>(_ => services);
services.AddSingleton<ISerializer, JsonUtf8Serializer>();
_provider = services.BuildServiceProvider();
}


+ 68
- 0
test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs ファイルの表示

@@ -0,0 +1,68 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

namespace DotNetCore.CAP.Test
{
public class SubscribeInvokerWithCancellation
{
private readonly IServiceProvider _serviceProvider;

public SubscribeInvokerWithCancellation()
{
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging();
serviceCollection.AddSingleton<IBootstrapper, Bootstrapper>();
serviceCollection.AddSingleton<ISerializer, JsonUtf8Serializer>();
serviceCollection.AddSingleton<ISubscribeInvoker, SubscribeInvoker>();
_serviceProvider = serviceCollection.BuildServiceProvider();
}

private ISubscribeInvoker SubscribeInvoker => _serviceProvider.GetService<ISubscribeInvoker>();

[Fact]
public async Task InvokeTest()
{
var descriptor = new ConsumerExecutorDescriptor()
{
Attribute = new CandidatesTopic("fake.output.withcancellation"),
ServiceTypeInfo = typeof(FakeSubscriberWithCancellation).GetTypeInfo(),
ImplTypeInfo = typeof(FakeSubscriberWithCancellation).GetTypeInfo(),
MethodInfo = typeof(FakeSubscriberWithCancellation)
.GetMethod(nameof(FakeSubscriberWithCancellation.CancellationTokenInjected)),
Parameters = new List<ParameterDescriptor>
{
new ParameterDescriptor {
ParameterType = typeof(CancellationToken),
IsFromCap = true,
Name = "cancellationToken"
}
}
};

var header = new Dictionary<string, string>();
var message = new Message(header, null);
var context = new ConsumerContext(descriptor, message);

var cancellationToken = new CancellationToken();
var ret = await SubscribeInvoker.InvokeAsync(context, cancellationToken);
Assert.Equal(cancellationToken, ret.Result);
}
}

public class FakeSubscriberWithCancellation : ICapSubscribe
{
[CapSubscribe("fake.output.withcancellation")]
public CancellationToken CancellationTokenInjected(CancellationToken cancellationToken)
{
return cancellationToken;
}
}
}

+ 64
- 0
test/DotNetCore.CAP.Test/SubscriberCollisionTests/NewMethodTests.cs ファイルの表示

@@ -0,0 +1,64 @@
using Xunit;
using ExternalModuleSubscriberClass = DotNetCore.CAP.MultiModuleSubscriberTests.SubscriberClass;

namespace DotNetCore.CAP.Test.SubscriberCollisionTests
{
public class NewMethodTests
{
[Fact]
public void NoCollision_SameClassAndMethod_DifferentAssemblies()
{
var methodInfo = typeof(SubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));
var externalMethodInfo = typeof(ExternalModuleSubscriberClass).GetMethod(nameof(ExternalModuleSubscriberClass.TestSubscriber));

Assert.NotEqual(methodInfo.MethodHandle, externalMethodInfo.MethodHandle);
}

[Fact]
public void Collision_Subclass_SameAssembly_MethodHandleOnly()
{
var methodInfo1 = typeof(Subclass1OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));
var methodInfo2 = typeof(Subclass2OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));

Assert.Equal(methodInfo1.MethodHandle.Value,
methodInfo2.MethodHandle.Value);
}

[Fact]
public void NoCollision_Subclass_SameAssembly_TypeAndMethodHandle()
{
var methodInfo1 = typeof(Subclass1OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));
var methodInfo2 = typeof(Subclass2OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));

Assert.NotEqual($"{methodInfo1.MethodHandle.Value}_{methodInfo1.ReflectedType.TypeHandle.Value}",
$"{methodInfo2.MethodHandle.Value}_{methodInfo2.ReflectedType.TypeHandle.Value}");
}

[Fact]
public void NoCollision_SubclassOfGenericOpenType_SameAssembly_Handle()
{
var methodInfo1 = typeof(BaseClass<>)
.MakeGenericType(typeof(MessageType1))
.GetMethod(nameof(BaseClass<object>.Handle));
var methodInfo2 = typeof(BaseClass<>)
.MakeGenericType(typeof(MessageType2))
.GetMethod(nameof(BaseClass<object>.Handle));

Assert.NotEqual($"{methodInfo1.MethodHandle.Value}_{methodInfo1.ReflectedType.TypeHandle.Value}",
$"{methodInfo2.MethodHandle.Value}_{methodInfo2.ReflectedType.TypeHandle.Value}");
}

private class Subclass1OfSubscriberClass : SubscriberClass { }
private class Subclass2OfSubscriberClass : SubscriberClass { }

private class MessageType1 { }
private class MessageType2 { }
private abstract class BaseClass<T>
{
public void Handle()
{
}
}

}
}

+ 61
- 0
test/DotNetCore.CAP.Test/SubscriberCollisionTests/OldMethodTests.cs ファイルの表示

@@ -0,0 +1,61 @@
using Xunit;
using ExternalModuleSubscriberClass = DotNetCore.CAP.MultiModuleSubscriberTests.SubscriberClass;

namespace DotNetCore.CAP.Test.SubscriberCollisionTests
{
public class OldMethodTests
{
[Fact]
public void NoCollision_SameClassAndMethod_DifferentAssemblies()
{
var methodInfo = typeof(SubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));
var externalMethodInfo = typeof(ExternalModuleSubscriberClass).GetMethod(nameof(ExternalModuleSubscriberClass.TestSubscriber));

var reflectedType = methodInfo.ReflectedType.Name;
var key = $"{methodInfo.Module.Name}_{reflectedType}_{methodInfo.MetadataToken}";
var externalReflectedType = methodInfo.ReflectedType.Name;
var externalKey = $"{externalMethodInfo.Module.Name}_{externalReflectedType}_{externalMethodInfo.MetadataToken}";

Assert.NotEqual(key, externalKey);
}

[Fact]
public void NoCollision_Subclasses_SameAssembly()
{
var methodInfo1 = typeof(Subclass1OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));
var methodInfo2 = typeof(Subclass2OfSubscriberClass).GetMethod(nameof(SubscriberClass.TestSubscriber));

var reflectedType = methodInfo1.ReflectedType.Name;
var key = $"{methodInfo1.Module.Name}_{reflectedType}_{methodInfo1.MetadataToken}";
var externalReflectedType = methodInfo2.ReflectedType.Name;
var externalKey = $"{methodInfo2.Module.Name}_{externalReflectedType}_{methodInfo2.MetadataToken}";

Assert.NotEqual(key, externalKey);
}

[Fact]
public void Collision_SubclassOfGenericOpenType_SameAssembly_Handle()
{
var method1 = typeof(BaseClass<>)
.MakeGenericType(typeof(MessageType1))
.GetMethod(nameof(BaseClass<object>.Handle));
var method2 = typeof(BaseClass<>)
.MakeGenericType(typeof(MessageType2))
.GetMethod(nameof(BaseClass<object>.Handle));

Assert.Equal(method1.MetadataToken, method2.MetadataToken);
}

private class Subclass1OfSubscriberClass : SubscriberClass {}
private class Subclass2OfSubscriberClass : SubscriberClass {}
private class MessageType1 { }
private class MessageType2 { }
private abstract class BaseClass<T>
{
public void Handle()
{
}
}
}
}

+ 9
- 0
test/DotNetCore.CAP.Test/SubscriberCollisionTests/SubscriberClass.cs ファイルの表示

@@ -0,0 +1,9 @@
namespace DotNetCore.CAP.Test.SubscriberCollisionTests
{
public class SubscriberClass
{
public void TestSubscriber()
{
}
}
}

読み込み中…
キャンセル
保存