ソースを参照

Added possibility to process messages for each consumer group independently (#1034)

* Clarifying the behavior of Subscription Group

* Improved logging informations

* Added DispatcherPerGroup implementation

* Added Sample.RabbitMQ.SqlServer.DispatcherPerGroup

* Renaming receivers in sample

* Updated documentation
master
Dariusz Lenartowicz 3年前
committed by GitHub
コミット
f034c1e502
この署名に対応する既知のキーがデータベースに存在しません GPGキーID: 4AEE18F83AFDEB23
22個のファイルの変更649行の追加15行の削除
  1. +7
    -0
      CAP.sln
  2. +2
    -1
      README.md
  3. +8
    -2
      docs/content/user-guide/en/cap/configuration.md
  4. +35
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Controllers/HomeController.cs
  5. +12
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/TestMessage.cs
  6. +25
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/VeryFastProcessingReceiver.cs
  7. +25
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/XSlowProcessingReceiver.cs
  8. +58
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs
  9. +26
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj
  10. +52
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs
  11. +4
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandler.cs
  12. +15
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlerTopicAttribute.cs
  13. +31
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlersExtensions.cs
  14. +87
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/TypedConsumerServiceSelector.cs
  15. +10
    -0
      samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/appsettings.json
  16. +1
    -0
      src/DotNetCore.CAP/CAP.Builder.cs
  17. +7
    -0
      src/DotNetCore.CAP/CAP.Options.cs
  18. +12
    -2
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  19. +13
    -6
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  20. +4
    -4
      src/DotNetCore.CAP/Internal/LoggerExtensions.cs
  21. +2
    -0
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  22. +213
    -0
      src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs

+ 7
- 0
CAP.sln ファイルの表示

@@ -82,6 +82,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -196,6 +198,10 @@ Global
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.Build.0 = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -228,6 +234,7 @@ Global
{23684403-7DA8-489A-8A1E-8056D7683E18} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 2
- 1
README.md ファイルの表示

@@ -227,7 +227,8 @@ public void ShowTime2(DateTime datetime)
}

```
`ShowTime1` and `ShowTime2` will be called at the same time.
`ShowTime1` and `ShowTime2` will be called one after another because all received messages are processed linear.
You can change that behaviour increasing `ConsumerThreadCount`.

BTW, You can specify the default group name in the configuration:



+ 8
- 2
docs/content/user-guide/en/cap/configuration.md ファイルの表示

@@ -93,7 +93,7 @@ The interval of the collector processor deletes expired messages.

#### ConsumerThreadCount

> Default : 1
> Default: 1

Number of consumer threads, when this value is greater than 1, the order of message execution cannot be guaranteed.

@@ -115,4 +115,10 @@ Failure threshold callback. This action is called when the retry reaches the val

> Default: 24*3600 sec (1 days)

The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.
The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.

#### UseDispatchingPerGroup

> Default: false

If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.

+ 35
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Controllers/HomeController.cs ファイルの表示

@@ -0,0 +1,35 @@
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Data.SqlClient;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages;
using System;
using System.Threading.Tasks;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Controllers
{
public class HomeController : Controller
{
private readonly ICapPublisher _capPublisher;

public HomeController(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}

public async Task<IActionResult> Index()
{
await using (var connection = new SqlConnection("Server=(local);Database=CAP-Test;Trusted_Connection=True;"))
{
using var transaction = connection.BeginTransaction(_capPublisher);
// This is where you would do other work that is going to persist data to your database

var message = TestMessage.Create($"This is message text created at {DateTime.Now:O}.");

await _capPublisher.PublishAsync(typeof(TestMessage).FullName, message);
transaction.Commit();
}

return Content("ok");
}
}
}

+ 12
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/TestMessage.cs ファイルの表示

@@ -0,0 +1,12 @@
namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
public class TestMessage
{
public static TestMessage Create(string text) => new()
{
Text = text
};

public string Text { get; private init; }
}
}

+ 25
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/VeryFastProcessingReceiver.cs ファイルの表示

@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
[QueueHandlerTopic("fasttopic")]
public class VeryFastProcessingReceiver : QueueHandler
{
private readonly ILogger<VeryFastProcessingReceiver> _logger;

public VeryFastProcessingReceiver(ILogger<VeryFastProcessingReceiver> logger)
{
_logger = logger;
}

public async Task Handle(TestMessage value)
{
_logger.LogInformation($"Starting FAST processing handler {DateTime.Now:O}: {value.Text}");
await Task.Delay(50);
_logger.LogInformation($"Ending FAST processing handler {DateTime.Now:O}: {value.Text}");
}
}
}

+ 25
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/XSlowProcessingReceiver.cs ファイルの表示

@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
[QueueHandlerTopic("slowtopic")]
public class XSlowProcessingReceiver : QueueHandler
{
private readonly ILogger<XSlowProcessingReceiver> _logger;

public XSlowProcessingReceiver(ILogger<XSlowProcessingReceiver> logger)
{
_logger = logger;
}

public async Task Handle(TestMessage value)
{
_logger.LogInformation($"Starting SLOW processing handler {DateTime.Now:O}: {value.Text}");
await Task.Delay(10000);
_logger.LogInformation($"Ending SLOW processing handler {DateTime.Now:O}: {value.Text}");
}
}
}

+ 58
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs ファイルの表示

@@ -0,0 +1,58 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
using System;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup
{
public class Program
{
public static int Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Debug()
#if DEBUG
.WriteTo.Seq("http://localhost:5341")
#endif
.CreateLogger();

try
{
Log.Information("Starting host...");
CreateHostBuilder(args).Build().Run();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex.InnerException ?? ex, "Host terminated unexpectedly");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration((context, builder) =>
{
builder
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true);
})
.UseSerilog((context, configuration) =>
{
configuration.ReadFrom.Configuration(context.Configuration);
}, true, true)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}

+ 26
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj ファイルの表示

@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<LangVersion>latest</LangVersion>
<AspNetCoreHostingModel>OutOfProcess</AspNetCoreHostingModel>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.4" />

<PackageReference Include="Serilog.AspNetCore" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 52
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs ファイルの表示

@@ -0,0 +1,52 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;
using Serilog;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddLogging(x => x.AddSerilog());

services
.AddSingleton<IConsumerServiceSelector, TypedConsumerServiceSelector>()
.AddQueueHandlers(typeof(Startup).Assembly);

services.AddCap(options =>
{
options.UseSqlServer("Server=(local);Database=CAP-Test;Trusted_Connection=True;");
options.UseRabbitMQ("localhost");
options.UseDashboard();
options.GroupNamePrefix = "th";
options.ConsumerThreadCount = 1;

options.UseDispatchingPerGroup = true;
});

services.AddControllersWithViews();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseDeveloperExceptionPage();
app.UseSerilogRequestLogging();
app.UseCapDashboard();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllerRoute(
name: "default",
pattern: "{controller=Home}/{action=Index}/{id?}");
});
}
}
}

+ 4
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandler.cs ファイルの表示

@@ -0,0 +1,4 @@
namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
public abstract class QueueHandler { }
}

+ 15
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlerTopicAttribute.cs ファイルの表示

@@ -0,0 +1,15 @@
using System;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
[AttributeUsage(AttributeTargets.Class)]
public class QueueHandlerTopicAttribute : Attribute
{
public string Topic { get; }

public QueueHandlerTopicAttribute(string topic)
{
Topic = topic;
}
}
}

+ 31
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlersExtensions.cs ファイルの表示

@@ -0,0 +1,31 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Linq;
using System.Reflection;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
internal static class QueueHandlersExtensions
{
private static readonly Type queueHandlerType = typeof(QueueHandler);

public static IServiceCollection AddQueueHandlers(this IServiceCollection services, params Assembly[] assemblies)
{
assemblies ??= new[] { Assembly.GetEntryAssembly() };

foreach (var type in assemblies.Distinct().SelectMany(x => x.GetTypes().Where(FilterHandlers)))
{
services.AddTransient(queueHandlerType, type);
}

return services;
}

private static bool FilterHandlers(Type t)
{
var topic = t.GetCustomAttribute<QueueHandlerTopicAttribute>();

return queueHandlerType.IsAssignableFrom(t) && topic != null && t.IsClass && !t.IsAbstract;
}
}
}

+ 87
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/TypedConsumerServiceSelector.cs ファイルの表示

@@ -0,0 +1,87 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
internal class TypedConsumerServiceSelector : ConsumerServiceSelector
{
private readonly CapOptions _capOptions;

public TypedConsumerServiceSelector(IServiceProvider serviceProvider) : base(serviceProvider)
{
_capOptions = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value;
}

protected override IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(IServiceProvider provider)
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(30);

using var scoped = provider.CreateScope();
var consumerServices = scoped.ServiceProvider.GetServices<QueueHandler>();
foreach (var service in consumerServices)
{
var typeInfo = service.GetType().GetTypeInfo();
if (!typeof(QueueHandler).GetTypeInfo().IsAssignableFrom(typeInfo))
{
continue;
}

executorDescriptorList.AddRange(GetMyDescription(typeInfo));
}

return executorDescriptorList;
}

private IEnumerable<ConsumerExecutorDescriptor> GetMyDescription(TypeInfo typeInfo)
{
var method = typeInfo.DeclaredMethods.FirstOrDefault(x => x.Name == "Handle");
if (method == null) yield break;

var topicAttr = typeInfo.GetCustomAttributes<QueueHandlerTopicAttribute>(true);
var topicAttributes = topicAttr as IList<QueueHandlerTopicAttribute> ?? topicAttr.ToList();

if (topicAttributes.Count == 0) yield break;

foreach (var attr in topicAttributes)
{
var topic = attr.Topic == null
? _capOptions.DefaultGroupName + "." + _capOptions.Version
: attr.Topic + "." + _capOptions.Version;

if (!string.IsNullOrEmpty(_capOptions.GroupNamePrefix))
{
topic = $"{_capOptions.GroupNamePrefix}.{topic}";
}

var parameters = method.GetParameters().Select(p => new ParameterDescriptor
{
Name = p.Name,
ParameterType = p.ParameterType,
IsFromCap = p.GetCustomAttributes(typeof(FromCapAttribute)).Any()
}).ToList();

var capName = parameters.FirstOrDefault(x => !x.IsFromCap)?.ParameterType.FullName;
if (string.IsNullOrEmpty(capName)) continue;

yield return new ConsumerExecutorDescriptor
{
Attribute = new CapSubscribeAttribute(capName)
{
Group = topic
},
Parameters = parameters,
MethodInfo = method,
ImplTypeInfo = typeInfo,
TopicNamePrefix = _capOptions.TopicNamePrefix,
ServiceTypeInfo = typeInfo
};
}
}
}
}

+ 10
- 0
samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/appsettings.json ファイルの表示

@@ -0,0 +1,10 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
}

+ 1
- 0
src/DotNetCore.CAP/CAP.Builder.cs ファイルの表示

@@ -6,6 +6,7 @@ using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;


+ 7
- 0
src/DotNetCore.CAP/CAP.Options.cs ファイルの表示

@@ -27,6 +27,7 @@ namespace DotNetCore.CAP
Version = "v1";
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower();
CollectorCleaningInterval = 300;
UseDispatchingPerGroup = false;
}

internal IList<ICapOptionsExtension> Extensions { get; }
@@ -80,6 +81,12 @@ namespace DotNetCore.CAP
/// </summary>
public int ConsumerThreadCount { get; set; }

/// <summary>
/// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as <see cref="ConsumerThreadCount"/> value is.
/// Default is false.
/// </summary>
public bool UseDispatchingPerGroup { get; set; }

/// <summary>
/// The number of producer thread connections.
/// Default is 1


+ 12
- 2
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs ファイルの表示

@@ -51,9 +51,8 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<TransportCheckProcessor>();
services.TryAddSingleton<CollectorProcessor>();

//Sender and Executors
//Sender
services.TryAddSingleton<IMessageSender, MessageSender>();
services.TryAddSingleton<IDispatcher, Dispatcher>();

services.TryAddSingleton<ISerializer, JsonUtf8Serializer>();

@@ -63,6 +62,17 @@ namespace Microsoft.Extensions.DependencyInjection
//Options and extension service
var options = new CapOptions();
setupAction(options);

//Executors
if (options.UseDispatchingPerGroup)
{
services.TryAddSingleton<IDispatcher, DispatcherPerGroup>();
}
else
{
services.TryAddSingleton<IDispatcher, Dispatcher>();
}

foreach (var serviceExtension in options.Extensions)
{
serviceExtension.AddServices(services);


+ 13
- 6
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs ファイルの表示

@@ -45,7 +45,7 @@ namespace DotNetCore.CAP.Internal

public Task<OperateResult> DispatchAsync(MediumMessage message, CancellationToken cancellationToken)
{
var selector = _provider.GetService<MethodMatcherCache>();
var selector = _provider.GetRequiredService<MethodMatcherCache>();
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor))
{
var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." +
@@ -66,13 +66,13 @@ namespace DotNetCore.CAP.Internal
OperateResult result;
do
{
var executedResult = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken);
result = executedResult.Item2;
var (shouldRetry, operateResult) = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken);
result = operateResult;
if (result == OperateResult.Success)
{
return result;
}
retry = executedResult.Item1;
retry = shouldRetry;
} while (retry);

return result;
@@ -89,7 +89,10 @@ namespace DotNetCore.CAP.Internal

try
{
_logger.ConsumerExecuting(descriptor.MethodInfo.Name);
_logger.ConsumerExecuting(
descriptor.ImplTypeInfo.Name,
descriptor.MethodInfo.Name,
descriptor.Attribute.Group ?? _options.DefaultGroupName);

var sp = Stopwatch.StartNew();

@@ -99,7 +102,11 @@ namespace DotNetCore.CAP.Internal

await SetSuccessfulState(message);

_logger.ConsumerExecuted(descriptor.MethodInfo.Name, sp.Elapsed.TotalMilliseconds);
_logger.ConsumerExecuted(
descriptor.ImplTypeInfo.Name,
descriptor.MethodInfo.Name,
descriptor.Attribute.Group ?? _options.DefaultGroupName,
sp.Elapsed.TotalMilliseconds);

return (false, OperateResult.Success);
}


+ 4
- 4
src/DotNetCore.CAP/Internal/LoggerExtensions.cs ファイルの表示

@@ -45,14 +45,14 @@ namespace DotNetCore.CAP.Internal
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}

public static void ConsumerExecuting(this ILogger logger, string methodName)
public static void ConsumerExecuting(this ILogger logger, string className, string methodName, string group)
{
logger.LogInformation($"Executing subscriber method '{methodName}'");
logger.LogInformation($"Executing subscriber method '{className}.{methodName}' on group '{group}'");
}

public static void ConsumerExecuted(this ILogger logger, string methodName, double milliseconds)
public static void ConsumerExecuted(this ILogger logger, string className, string methodName, string group, double milliseconds)
{
logger.LogInformation($"Executed subscriber method '{methodName}' in {milliseconds} ms");
logger.LogInformation($"Executed subscriber method '{className}.{methodName}' on group '{group}' in {milliseconds} ms");
}

public static void ServerStarting(this ILogger logger)


+ 2
- 0
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs ファイルの表示

@@ -66,6 +66,8 @@ namespace DotNetCore.CAP.Processor

Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Processing(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

_logger.LogInformation("Starting default Dispatcher");
}

public void EnqueueToPublish(MediumMessage message)


+ 213
- 0
src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs ファイルの表示

@@ -0,0 +1,213 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
{
internal class DispatcherPerGroup : IDispatcher
{
private readonly IMessageSender _sender;
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

private Channel<MediumMessage> _publishedChannel;
// private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor)>> _receivedChannels;
private CancellationToken _stoppingToken;

public DispatcherPerGroup(
ILogger<Dispatcher> logger,
IMessageSender sender,
IOptions<CapOptions> options,
ISubscribeDispatcher executor)
{
_logger = logger;
_sender = sender;
_options = options.Value;
_executor = executor;
}

public void Start(CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;
_stoppingToken.ThrowIfCancellationRequested();
_stoppingToken.Register(() => _cts.Cancel());

var capacity = _options.ProducerThreadCount * 500;
_publishedChannel = Channel.CreateBounded<MediumMessage>(new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ProducerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

_receivedChannels = new ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor)>>(_options.ConsumerThreadCount, _options.ConsumerThreadCount * 2);
GetOrCreateReceiverChannel(_options.DefaultGroupName);

_logger.LogInformation("Starting DispatcherPerGroup");
}

public void EnqueueToPublish(MediumMessage message)
{
try
{
if (!_publishedChannel.Writer.TryWrite(message))
{
while (_publishedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (_publishedChannel.Writer.TryWrite(message))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor)
{
try
{
var group = descriptor.Attribute.Group ?? _options.DefaultGroupName;

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Enqueue message for group {ConsumerGroup}", group);
}

var channel = GetOrCreateReceiverChannel(group);

if (!channel.Writer.TryWrite((message, descriptor)))
{
while (channel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult())
{
if (channel.Writer.TryWrite((message, descriptor)))
{
return;
}
}
}
}
catch (OperationCanceledException)
{
//Ignore
}
}

private Channel<(MediumMessage, ConsumerExecutorDescriptor)> GetOrCreateReceiverChannel(string key)
{
return _receivedChannels.GetOrAdd(key, group =>
{
_logger.LogInformation("Creating receiver channel for group {ConsumerGroup} with thread count {ConsumerThreadCount}", group, _options.ConsumerThreadCount);

var capacity = _options.ConsumerThreadCount * 300;
var channel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor)>(new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity)
{
AllowSynchronousContinuations = true,
SingleReader = _options.ConsumerThreadCount == 1,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});

Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(() => Processing(group, channel, _stoppingToken), _stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

return channel;
});
}

private async Task Sending(CancellationToken cancellationToken)
{
try
{
while (await _publishedChannel.Reader.WaitToReadAsync(cancellationToken))
{
while (_publishedChannel.Reader.TryRead(out var message))
{
try
{
var result = await _sender.SendAsync(message);
if (!result.Succeeded)
{
_logger.MessagePublishException(
message.Origin.GetId(),
result.ToString(),
result.Exception);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}");
}
}
}
}
catch (OperationCanceledException)
{
// expected
}
}

private async Task Processing(string group, Channel<(MediumMessage, ConsumerExecutorDescriptor)> channel, CancellationToken cancellationToken)
{
try
{
while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
while (channel.Reader.TryRead(out var message))
{
try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Dispatching message for group {ConsumerGroup}", group);
}

await _executor.DispatchAsync(message.Item1, message.Item2, cancellationToken);
}
catch (OperationCanceledException)
{
//expected
}
catch (Exception e)
{
_logger.LogError(e, $"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}");
}
}
}
}
catch (OperationCanceledException)
{
// expected
}
}

public void Dispose()
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
}
}
}

読み込み中…
キャンセル
保存