# Conflicts: # build/version.props # samples/Sample.Kafka.MySql/Sample.Kafka.MySql.csproj # samples/Sample.RabbitMQ.MySql/AppDbContext.cs # samples/Sample.RabbitMQ.MySql/Program.cs # samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj # src/DotNetCore.CAP/LoggerExtensions.csmaster
@@ -0,0 +1,40 @@ | |||||
<!-- | |||||
Thank you for reporting an issue. | |||||
1. It's RECOMMENDED to submit PR for typo or tiny bug fix. | |||||
2. If this's a FEATURE request, please provide: details, pseudo codes if necessary. | |||||
3. If this's a BUG, please provide: course repetition, error log and configuration. Fill in as much of the template below as you're able. | |||||
感谢您向我们反馈问题。 | |||||
1. 提交问题前,请先阅读 https://github.com/dotnetcore/CAP/wiki 上的文档。 | |||||
2. 我们推荐如果是小问题(错别字修改,小的 bug fix)直接提交 PR。 | |||||
3. 如果是一个新需求,请提供:详细需求描述,最好是有伪代码实现。 | |||||
4. 如果是一个 BUG,请提供:复现步骤,错误日志以及相关配置,并尽量填写下面的模板中的条目。 | |||||
6. 扩展阅读:[如何向开源项目提交无法解答的问题](https://zhuanlan.zhihu.com/p/25795393) | |||||
--> | |||||
Please answer these questions before submitting your issue. | |||||
- Why do you submit this issue? | |||||
- [ ] Question or discussion | |||||
- [ ] Bug | |||||
- [ ] Requirement | |||||
- [ ] Feature or performance improvement | |||||
___ | |||||
### Question | |||||
- What do you want to know? | |||||
___ | |||||
### Bug | |||||
- Which version of CAP, OS and .NET Core? | |||||
- Which company or project? | |||||
- What happen? | |||||
If possible, provide a way for reproducing the error. e.g. demo application, component version. | |||||
___ | |||||
### Requirement or improvement | |||||
- Please describe about your requirements or improvement suggestions. |
@@ -1,6 +1,9 @@ | |||||
language: cpp | |||||
language: csharp | |||||
sudo: required | sudo: required | ||||
dist: trusty | dist: trusty | ||||
solution: CAP.sln | |||||
dotnet: 2.1.300 | |||||
mono: none | |||||
matrix: | matrix: | ||||
include: | include: | ||||
@@ -10,33 +13,9 @@ matrix: | |||||
- os: osx | - os: osx | ||||
osx_image: xcode8.3 # macOS 10.12 | osx_image: xcode8.3 # macOS 10.12 | ||||
env: | |||||
global: | |||||
- DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true | |||||
- DOTNET_CLI_TELEMETRY_OPTOUT: 1 | |||||
- CLI_VERSION=2.0.0 | |||||
addons: | |||||
apt: | |||||
packages: | |||||
- gettext | |||||
- libcurl4-openssl-dev | |||||
- libicu-dev | |||||
- libssl-dev | |||||
- libunwind8 | |||||
- zlib1g | |||||
# Make sure build dependencies are installed. | |||||
before_install: | |||||
- if test "$TRAVIS_OS_NAME" == "osx"; then ln -s /usr/local/opt/openssl/lib/libcrypto.1.0.0.dylib /usr/local/lib/; ln -s /usr/local/opt/openssl/lib/libssl.1.0.0.dylib /usr/local/lib/; fi | |||||
- export DOTNET_INSTALL_DIR="$PWD/.dotnetcli" | |||||
- export PATH="$DOTNET_INSTALL_DIR:$PATH" | |||||
install: | |||||
- travis_retry curl -sSL https://dot.net/v1/dotnet-install.sh | bash /dev/stdin --channel 2.0 --version "$CLI_VERSION" --install-dir "$DOTNET_INSTALL_DIR" | |||||
# Run the build script | # Run the build script | ||||
script: | script: | ||||
- dotnet --info | - dotnet --info | ||||
- dotnet restore | |||||
- dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0 | |||||
- dotnet restore CAP.sln | |||||
- dotnet build CAP.sln | |||||
- dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj |
@@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution | |||||
CHANGELOG.md = CHANGELOG.md | CHANGELOG.md = CHANGELOG.md | ||||
CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md | CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md | ||||
ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1 | ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1 | ||||
.github\ISSUE_TEMPLATE = .github\ISSUE_TEMPLATE | |||||
LICENSE.txt = LICENSE.txt | LICENSE.txt = LICENSE.txt | ||||
README.md = README.md | README.md = README.md | ||||
README.zh-cn.md = README.zh-cn.md | README.zh-cn.md = README.zh-cn.md | ||||
@@ -35,7 +36,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D | |||||
build.cake = build.cake | build.cake = build.cake | ||||
build.ps1 = build.ps1 | build.ps1 = build.ps1 | ||||
build.sh = build.sh | build.sh = build.sh | ||||
build\common.props = build\common.props | |||||
build\index.cake = build\index.cake | build\index.cake = build\index.cake | ||||
build\util.cake = build\util.cake | build\util.cake = build\util.cake | ||||
build\version.cake = build\version.cake | build\version.cake = build\version.cake | ||||
@@ -0,0 +1,18 @@ | |||||
<Project> | |||||
<Import Project="build\version.props" /> | |||||
<PropertyGroup Label="Package"> | |||||
<Product>CAP</Product> | |||||
<Authors>.NET Core Community;Savorboard</Authors> | |||||
<RepositoryUrl>https://github.com/dotnetcore/CAP</RepositoryUrl> | |||||
<RepositoryType>git</RepositoryType> | |||||
<RepositoryRoot>$(MSBuildThisFileDirectory)</RepositoryRoot> | |||||
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl> | |||||
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl> | |||||
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl> | |||||
<PackageTags>CAP;EventBus;Distributed Transaction</PackageTags> | |||||
<Description>EventBus and eventually consistency in distributed architectures.</Description> | |||||
</PropertyGroup> | |||||
</Project> |
@@ -3,7 +3,7 @@ | |||||
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) | [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) | ||||
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) | [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) | ||||
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) | [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) | ||||
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) | |||||
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore) | |||||
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) | [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) | ||||
CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently. | CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently. | ||||
@@ -3,7 +3,7 @@ | |||||
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) | [![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap) | ||||
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) | [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) | ||||
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) | [![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/) | ||||
[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore) | |||||
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore) | |||||
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) | [![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt) | ||||
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。 | CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。 | ||||
@@ -1,17 +0,0 @@ | |||||
<Project> | |||||
<Import Project="version.props" /> | |||||
<PropertyGroup Label="Package"> | |||||
<Product>CAP</Product> | |||||
<Authors>savorboard;dotnetcore</Authors> | |||||
<RepositoryUrl>https://github.com/dotnetcore/CAP</RepositoryUrl> | |||||
<RepositoryType>git</RepositoryType> | |||||
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl> | |||||
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl> | |||||
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl> | |||||
<PackageTags>CAP;EventBus;Distributed Transaction</PackageTags> | |||||
<Description>EventBus and eventually consistency in distributed architectures.</Description> | |||||
</PropertyGroup> | |||||
</Project> |
@@ -34,7 +34,7 @@ namespace Sample.Kafka.MySql.Controllers | |||||
return Ok("publish successful!"); | return Ok("publish successful!"); | ||||
} | } | ||||
[CapSubscribe("xxx.xxx.test2")] | |||||
[CapSubscribe("#.test2")] | |||||
public void Test2(int value) | public void Test2(int value) | ||||
{ | { | ||||
Console.WriteLine("Subscriber output message: " + value); | Console.WriteLine("Subscriber output message: " + value); | ||||
@@ -10,8 +10,8 @@ namespace Sample.Kafka.MySql | |||||
{ | { | ||||
services.AddCap(x => | services.AddCap(x => | ||||
{ | { | ||||
x.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;"); | |||||
x.UseKafka("192.168.10.110:9092"); | |||||
x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;"); | |||||
x.UseKafka("localhost:9092"); | |||||
x.UseDashboard(); | x.UseDashboard(); | ||||
}); | }); | ||||
@@ -0,0 +1,8 @@ | |||||
{ | |||||
"Logging": { | |||||
"IncludeScopes": false, | |||||
"LogLevel": { | |||||
"Default": "Debug" | |||||
} | |||||
} | |||||
} |
@@ -25,7 +25,6 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||||
return Ok(); | return Ok(); | ||||
} | } | ||||
[Route("~/publish2")] | [Route("~/publish2")] | ||||
public IActionResult PublishMessage2() | public IActionResult PublishMessage2() | ||||
{ | { | ||||
@@ -47,7 +46,7 @@ namespace Sample.RabbitMQ.MySql.Controllers | |||||
} | } | ||||
[NonAction] | [NonAction] | ||||
[CapSubscribe("sample.rabbitmq.mysql")] | |||||
[CapSubscribe("#.rabbitmq.mysql")] | |||||
public void ReceiveMessage(DateTime time) | public void ReceiveMessage(DateTime time) | ||||
{ | { | ||||
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time); | Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time); | ||||
@@ -23,9 +23,6 @@ namespace Sample.RabbitMQ.MySql | |||||
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) | public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) | ||||
{ | { | ||||
loggerFactory.AddConsole(); | |||||
loggerFactory.AddDebug(); | |||||
app.UseMvc(); | app.UseMvc(); | ||||
app.UseCap(); | app.UseCap(); | ||||
@@ -0,0 +1,8 @@ | |||||
{ | |||||
"Logging": { | |||||
"IncludeScopes": false, | |||||
"LogLevel": { | |||||
"Default": "Debug" | |||||
} | |||||
} | |||||
} |
@@ -1,7 +1,5 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<Import Project="..\..\build\common.props" /> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP.Kafka</AssemblyName> | <AssemblyName>DotNetCore.CAP.Kafka</AssemblyName> | ||||
@@ -6,23 +6,29 @@ using System.Collections.Concurrent; | |||||
using System.Diagnostics; | using System.Diagnostics; | ||||
using System.Threading; | using System.Threading; | ||||
using Confluent.Kafka; | using Confluent.Kafka; | ||||
using Microsoft.Extensions.Logging; | |||||
using Newtonsoft.Json; | |||||
namespace DotNetCore.CAP.Kafka | namespace DotNetCore.CAP.Kafka | ||||
{ | { | ||||
public class ConnectionPool : IConnectionPool, IDisposable | public class ConnectionPool : IConnectionPool, IDisposable | ||||
{ | { | ||||
private readonly ILogger<ConnectionPool> _logger; | |||||
private readonly Func<Producer> _activator; | private readonly Func<Producer> _activator; | ||||
private readonly ConcurrentQueue<Producer> _pool = new ConcurrentQueue<Producer>(); | |||||
private readonly ConcurrentQueue<Producer> _pool; | |||||
private int _count; | private int _count; | ||||
private int _maxSize; | private int _maxSize; | ||||
public ConnectionPool(KafkaOptions options) | |||||
public ConnectionPool(ILogger<ConnectionPool> logger, KafkaOptions options) | |||||
{ | { | ||||
_logger = logger; | |||||
_pool = new ConcurrentQueue<Producer>(); | |||||
_maxSize = options.ConnectionPoolSize; | _maxSize = options.ConnectionPoolSize; | ||||
_activator = CreateActivator(options); | _activator = CreateActivator(options); | ||||
ServersAddress = options.Servers; | ServersAddress = options.Servers; | ||||
_logger.LogDebug("Kafka configuration of CAP :\r\n {0}", | |||||
JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented)); | |||||
} | } | ||||
public string ServersAddress { get; } | public string ServersAddress { get; } | ||||
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP | |||||
services.AddSingleton<IStorageConnection, MySqlStorageConnection>(); | services.AddSingleton<IStorageConnection, MySqlStorageConnection>(); | ||||
services.AddScoped<ICapPublisher, CapPublisher>(); | services.AddScoped<ICapPublisher, CapPublisher>(); | ||||
services.AddScoped<ICallbackPublisher, CapPublisher>(); | services.AddScoped<ICallbackPublisher, CapPublisher>(); | ||||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | |||||
services.AddTransient<ICollectProcessor, MySqlCollectProcessor>(); | |||||
AddSingletionMySqlOptions(services); | AddSingletionMySqlOptions(services); | ||||
} | } | ||||
@@ -1,7 +1,5 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<Import Project="..\..\build\common.props" /> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName> | <AssemblyName>DotNetCore.CAP.MySql</AssemblyName> | ||||
@@ -14,10 +12,10 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> | |||||
<PackageReference Include="MySqlConnector" Version="0.38.0" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> | |||||
<PackageReference Include="MySqlConnector" Version="0.40.4" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -10,7 +10,7 @@ using MySql.Data.MySqlClient; | |||||
namespace DotNetCore.CAP.MySql | namespace DotNetCore.CAP.MySql | ||||
{ | { | ||||
internal class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
internal class MySqlCollectProcessor : ICollectProcessor | |||||
{ | { | ||||
private const int MaxBatch = 1000; | private const int MaxBatch = 1000; | ||||
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); | private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); | ||||
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.MySql | |||||
private readonly MySqlOptions _options; | private readonly MySqlOptions _options; | ||||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | ||||
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, | |||||
public MySqlCollectProcessor(ILogger<MySqlCollectProcessor> logger, | |||||
MySqlOptions mysqlOptions) | MySqlOptions mysqlOptions) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
@@ -27,8 +27,6 @@ namespace DotNetCore.CAP.MySql | |||||
public async Task ProcessAsync(ProcessingContext context) | public async Task ProcessAsync(ProcessingContext context) | ||||
{ | { | ||||
_logger.LogDebug("Collecting expired entities."); | |||||
var tables = new[] | var tables = new[] | ||||
{ | { | ||||
$"{_options.TableNamePrefix}.published", | $"{_options.TableNamePrefix}.published", | ||||
@@ -37,6 +35,8 @@ namespace DotNetCore.CAP.MySql | |||||
foreach (var table in tables) | foreach (var table in tables) | ||||
{ | { | ||||
_logger.LogDebug($"Collecting expired data from table [{table}]."); | |||||
int removedCount; | int removedCount; | ||||
do | do | ||||
{ | { |
@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix); | |||||
{ | { | ||||
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state"; | var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state"; | ||||
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); | |||||
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); | |||||
return count; | return count; | ||||
} | } | ||||
@@ -167,10 +167,10 @@ select aggr.* from ( | |||||
group by date_format(`Added`,'%Y-%m-%d-%H') | group by date_format(`Added`,'%Y-%m-%d-%H') | ||||
) aggr where `Key` in @keys;"; | ) aggr where `Key` in @keys;"; | ||||
var valuesMap = connection.Query( | |||||
var valuesMap = connection.Query<TimelineCounter>( | |||||
sqlQuery, | sqlQuery, | ||||
new {keys = keyMaps.Keys, statusName}) | |||||
.ToDictionary(x => (string) x.Key, x => (int) x.Count); | |||||
new { keys = keyMaps.Keys, statusName }) | |||||
.ToDictionary(x => x.Key, x => x.Count); | |||||
foreach (var key in keyMaps.Keys) | foreach (var key in keyMaps.Keys) | ||||
{ | { | ||||
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP | |||||
services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>(); | services.AddSingleton<IStorageConnection, PostgreSqlStorageConnection>(); | ||||
services.AddScoped<ICapPublisher, CapPublisher>(); | services.AddScoped<ICapPublisher, CapPublisher>(); | ||||
services.AddScoped<ICallbackPublisher, CapPublisher>(); | services.AddScoped<ICallbackPublisher, CapPublisher>(); | ||||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | |||||
services.AddTransient<ICollectProcessor, PostgreSqlCollectProcessor>(); | |||||
AddSingletonPostgreSqlOptions(services); | AddSingletonPostgreSqlOptions(services); | ||||
} | } | ||||
@@ -1,7 +1,5 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<Import Project="..\..\build\common.props" /> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName> | <AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName> | ||||
@@ -14,10 +12,10 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> | |||||
<PackageReference Include="Npgsql" Version="3.2.7" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> | |||||
<PackageReference Include="Npgsql" Version="4.0.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -10,7 +10,7 @@ using Npgsql; | |||||
namespace DotNetCore.CAP.PostgreSql | namespace DotNetCore.CAP.PostgreSql | ||||
{ | { | ||||
internal class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
internal class PostgreSqlCollectProcessor : ICollectProcessor | |||||
{ | { | ||||
private const int MaxBatch = 1000; | private const int MaxBatch = 1000; | ||||
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
private readonly PostgreSqlOptions _options; | private readonly PostgreSqlOptions _options; | ||||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | ||||
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, | |||||
public PostgreSqlCollectProcessor(ILogger<PostgreSqlCollectProcessor> logger, | |||||
PostgreSqlOptions sqlServerOptions) | PostgreSqlOptions sqlServerOptions) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.PostgreSql | |||||
public async Task ProcessAsync(ProcessingContext context) | public async Task ProcessAsync(ProcessingContext context) | ||||
{ | { | ||||
_logger.LogDebug("Collecting expired entities."); | |||||
foreach (var table in Tables) | foreach (var table in Tables) | ||||
{ | { | ||||
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}]."); | |||||
var removedCount = 0; | var removedCount = 0; | ||||
do | do | ||||
{ | { |
@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' | |||||
var sqlQuery = | var sqlQuery = | ||||
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; | $"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; | ||||
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); | |||||
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); | |||||
return count; | return count; | ||||
} | } | ||||
@@ -170,9 +170,9 @@ with aggr as ( | |||||
) | ) | ||||
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; | select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; | ||||
var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName}) | |||||
var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName }) | |||||
.ToList() | .ToList() | ||||
.ToDictionary(x => (string) x.Key, x => (int) x.Count); | |||||
.ToDictionary(x => x.Key, x => x.Count); | |||||
foreach (var key in keyMaps.Keys) | foreach (var key in keyMaps.Keys) | ||||
{ | { | ||||
@@ -1,7 +1,5 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<Import Project="..\..\build\common.props" /> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName> | <AssemblyName>DotNetCore.CAP.RabbitMQ</AssemblyName> | ||||
@@ -6,6 +6,7 @@ using System.Collections.Concurrent; | |||||
using System.Diagnostics; | using System.Diagnostics; | ||||
using System.Threading; | using System.Threading; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using Newtonsoft.Json; | |||||
using RabbitMQ.Client; | using RabbitMQ.Client; | ||||
namespace DotNetCore.CAP.RabbitMQ | namespace DotNetCore.CAP.RabbitMQ | ||||
@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private const int DefaultPoolSize = 15; | private const int DefaultPoolSize = 15; | ||||
private readonly Func<IConnection> _connectionActivator; | private readonly Func<IConnection> _connectionActivator; | ||||
private readonly ILogger<ConnectionChannelPool> _logger; | private readonly ILogger<ConnectionChannelPool> _logger; | ||||
private readonly ConcurrentQueue<IModel> _pool = new ConcurrentQueue<IModel>(); | |||||
private readonly ConcurrentQueue<IModel> _pool; | |||||
private IConnection _connection; | private IConnection _connection; | ||||
private int _count; | private int _count; | ||||
private int _maxSize; | private int _maxSize; | ||||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, | |||||
RabbitMQOptions options) | |||||
public ConnectionChannelPool(ILogger<ConnectionChannelPool> logger, RabbitMQOptions options) | |||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_maxSize = DefaultPoolSize; | _maxSize = DefaultPoolSize; | ||||
_pool = new ConcurrentQueue<IModel>(); | |||||
_connectionActivator = CreateConnection(options); | _connectionActivator = CreateConnection(options); | ||||
HostAddress = options.HostName + ":" + options.Port; | HostAddress = options.HostName + ":" + options.Port; | ||||
Exchange = options.ExchangeName; | Exchange = options.ExchangeName; | ||||
_logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}", | |||||
JsonConvert.SerializeObject(options, Formatting.Indented)); | |||||
} | } | ||||
IModel IConnectionChannelPool.Rent() | IModel IConnectionChannelPool.Rent() | ||||
@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) | private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) | ||||
{ | { | ||||
_logger.LogWarning($"RabbitMQ client connection closed! {e}"); | |||||
_logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}"); | |||||
} | } | ||||
public virtual IModel Rent() | public virtual IModel Rent() | ||||
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
_connection = _connectionChannelPool.GetConnection(); | _connection = _connectionChannelPool.GetConnection(); | ||||
_channel = _connection.CreateModel(); | _channel = _connection.CreateModel(); | ||||
_channel.ExchangeDeclare( | _channel.ExchangeDeclare( | ||||
_exchageName, | _exchageName, | ||||
RabbitMQOptions.ExchangeType, | RabbitMQOptions.ExchangeType, | ||||
@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||||
var args = new LogMessageEventArgs | var args = new LogMessageEventArgs | ||||
{ | { | ||||
LogType = MqLogType.ConsumerShutdown, | LogType = MqLogType.ConsumerShutdown, | ||||
Reason = e.ToString() | |||||
Reason = e.ReplyText | |||||
}; | }; | ||||
OnLog?.Invoke(sender, args); | OnLog?.Invoke(sender, args); | ||||
} | } | ||||
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP | |||||
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); | services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); | ||||
services.AddScoped<ICapPublisher, CapPublisher>(); | services.AddScoped<ICapPublisher, CapPublisher>(); | ||||
services.AddScoped<ICallbackPublisher, CapPublisher>(); | services.AddScoped<ICallbackPublisher, CapPublisher>(); | ||||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | |||||
services.AddTransient<ICollectProcessor, SqlServerCollectProcessor>(); | |||||
AddSqlServerOptions(services); | AddSqlServerOptions(services); | ||||
} | } | ||||
@@ -1,11 +1,10 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<Import Project="..\..\build\common.props" /> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> | <AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> | ||||
<PackageTags>$(PackageTags);SQL Server</PackageTags> | <PackageTags>$(PackageTags);SQL Server</PackageTags> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
@@ -14,10 +13,10 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.0.2" /> | |||||
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="2.1.0" /> | |||||
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging; | |||||
namespace DotNetCore.CAP.SqlServer | namespace DotNetCore.CAP.SqlServer | ||||
{ | { | ||||
public class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
public class SqlServerCollectProcessor : ICollectProcessor | |||||
{ | { | ||||
private const int MaxBatch = 1000; | private const int MaxBatch = 1000; | ||||
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
private readonly SqlServerOptions _options; | private readonly SqlServerOptions _options; | ||||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | ||||
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, | |||||
public SqlServerCollectProcessor(ILogger<SqlServerCollectProcessor> logger, | |||||
SqlServerOptions sqlServerOptions) | SqlServerOptions sqlServerOptions) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.SqlServer | |||||
public async Task ProcessAsync(ProcessingContext context) | public async Task ProcessAsync(ProcessingContext context) | ||||
{ | { | ||||
_logger.LogDebug("Collecting expired entities."); | |||||
foreach (var table in Tables) | foreach (var table in Tables) | ||||
{ | { | ||||
_logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}]."); | |||||
int removedCount; | int removedCount; | ||||
do | do | ||||
{ | { |
@@ -128,7 +128,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; | |||||
var sqlQuery = | var sqlQuery = | ||||
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state"; | $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state"; | ||||
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); | |||||
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); | |||||
return count; | return count; | ||||
} | } | ||||
@@ -171,10 +171,10 @@ with aggr as ( | |||||
) | ) | ||||
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; | select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; | ||||
var valuesMap = connection.Query( | |||||
var valuesMap = connection.Query<TimelineCounter>( | |||||
sqlQuery, | sqlQuery, | ||||
new {keys = keyMaps.Keys, statusName}) | |||||
.ToDictionary(x => (string) x.Key, x => (int) x.Count); | |||||
new { keys = keyMaps.Keys, statusName }) | |||||
.ToDictionary(x => x.Key, x => x.Count); | |||||
foreach (var key in keyMaps.Keys) | foreach (var key in keyMaps.Keys) | ||||
{ | { | ||||
@@ -0,0 +1,8 @@ | |||||
namespace DotNetCore.CAP.Dashboard | |||||
{ | |||||
public class TimelineCounter | |||||
{ | |||||
public string Key { get; set; } | |||||
public int Count { get; set; } | |||||
} | |||||
} |
@@ -1,32 +1,14 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<Import Project="..\..\build\common.props" /> | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.0</TargetFramework> | ||||
<AssemblyName>DotNetCore.CAP</AssemblyName> | |||||
<PackageTags>$(PackageTags);</PackageTags> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml</DocumentationFile> | <DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml</DocumentationFile> | ||||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | <NoWarn>1701;1702;1705;CS1591</NoWarn> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | |||||
<None Remove="Dashboard\Content\css\bootstrap.min.css" /> | |||||
<None Remove="Dashboard\Content\css\jsonview.min.css" /> | |||||
<None Remove="Dashboard\Content\css\rickshaw.min.css" /> | |||||
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.eot" /> | |||||
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.svg" /> | |||||
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.ttf" /> | |||||
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.woff" /> | |||||
<None Remove="Dashboard\Content\fonts\glyphicons-halflings-regular.woff2" /> | |||||
<None Remove="Dashboard\Content\js\bootstrap.min.js" /> | |||||
<None Remove="Dashboard\Content\js\d3.layout.min.js" /> | |||||
<None Remove="Dashboard\Content\js\d3.min.js" /> | |||||
<None Remove="Dashboard\Content\js\jquery-2.1.4.min.js" /> | |||||
<None Remove="Dashboard\Content\js\jsonview.min.js" /> | |||||
<None Remove="Dashboard\Content\js\moment-with-locales.min.js" /> | |||||
<None Remove="Dashboard\Content\js\moment.min.js" /> | |||||
<None Remove="Dashboard\Content\js\rickshaw.min.js" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | <ItemGroup> | ||||
<EmbeddedResource Include="Dashboard\Content\css\bootstrap.min.css" /> | <EmbeddedResource Include="Dashboard\Content\css\bootstrap.min.css" /> | ||||
<EmbeddedResource Include="Dashboard\Content\css\cap.css" /> | <EmbeddedResource Include="Dashboard\Content\css\cap.css" /> | ||||
@@ -47,19 +29,21 @@ | |||||
<EmbeddedResource Include="Dashboard\Content\js\moment.min.js" /> | <EmbeddedResource Include="Dashboard\Content\js\moment.min.js" /> | ||||
<EmbeddedResource Include="Dashboard\Content\js\rickshaw.min.js" /> | <EmbeddedResource Include="Dashboard\Content\js\rickshaw.min.js" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Consul" Version="0.7.2.4" /> | <PackageReference Include="Consul" Version="0.7.2.4" /> | ||||
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | <PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | ||||
<PackageReference Include="System.Data.Common" Version="4.3.0" /> | <PackageReference Include="System.Data.Common" Version="4.3.0" /> | ||||
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.4.1" /> | |||||
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.0" /> | |||||
<PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> | <PackageReference Include="System.Threading.ThreadPool" Version="4.3.0" /> | ||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<Compile Update="Dashboard\Content\resx\Strings.Designer.cs"> | <Compile Update="Dashboard\Content\resx\Strings.Designer.cs"> | ||||
<DesignTime>True</DesignTime> | <DesignTime>True</DesignTime> | ||||
@@ -143,9 +127,4 @@ | |||||
<LastGenOutput>Strings.Designer.cs</LastGenOutput> | <LastGenOutput>Strings.Designer.cs</LastGenOutput> | ||||
</EmbeddedResource> | </EmbeddedResource> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | |||||
<None Update="Dashboard\Pages\PublishedPage.cshtml"> | |||||
<Generator>RazorGenerator</Generator> | |||||
</None> | |||||
</ItemGroup> | |||||
</Project> | </Project> |
@@ -59,6 +59,8 @@ namespace DotNetCore.CAP | |||||
private async Task BootstrapTaskAsync() | private async Task BootstrapTaskAsync() | ||||
{ | { | ||||
_logger.LogInformation("### CAP starting..."); | |||||
await Storage.InitializeAsync(_cts.Token); | await Storage.InitializeAsync(_cts.Token); | ||||
if (_cts.IsCancellationRequested) | if (_cts.IsCancellationRequested) | ||||
@@ -83,6 +85,8 @@ namespace DotNetCore.CAP | |||||
_ctsRegistration.Dispose(); | _ctsRegistration.Dispose(); | ||||
_cts.Dispose(); | _cts.Dispose(); | ||||
_logger.LogInformation("### CAP started!"); | |||||
} | } | ||||
protected virtual Task BootstrapCoreAsync() | protected virtual Task BootstrapCoreAsync() | ||||
@@ -146,22 +146,22 @@ namespace DotNetCore.CAP | |||||
switch (logmsg.LogType) | switch (logmsg.LogType) | ||||
{ | { | ||||
case MqLogType.ConsumerCancelled: | case MqLogType.ConsumerCancelled: | ||||
_logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerRegistered: | case MqLogType.ConsumerRegistered: | ||||
_logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason); | |||||
_logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerUnregistered: | case MqLogType.ConsumerUnregistered: | ||||
_logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumerShutdown: | case MqLogType.ConsumerShutdown: | ||||
_logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason); | |||||
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ConsumeError: | case MqLogType.ConsumeError: | ||||
_logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason); | |||||
_logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); | |||||
break; | break; | ||||
case MqLogType.ServerConnError: | case MqLogType.ServerConnError: | ||||
_logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason); | |||||
_logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); | |||||
break; | break; | ||||
default: | default: | ||||
throw new ArgumentOutOfRangeException(); | throw new ArgumentOutOfRangeException(); | ||||
@@ -72,7 +72,7 @@ namespace DotNetCore.CAP | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
_logger.ExceptionOccuredWhileExecuting(message.Name, ex); | |||||
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}"); | |||||
await SetFailedState(message, ex, out bool stillRetry); | await SetFailedState(message, ex, out bool stillRetry); | ||||
if (stillRetry) | if (stillRetry) | ||||
@@ -5,6 +5,7 @@ using System; | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Linq; | using System.Linq; | ||||
using System.Reflection; | using System.Reflection; | ||||
using System.Text.RegularExpressions; | |||||
using DotNetCore.CAP.Abstractions; | using DotNetCore.CAP.Abstractions; | ||||
using DotNetCore.CAP.Infrastructure; | using DotNetCore.CAP.Infrastructure; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
@@ -19,6 +20,8 @@ namespace DotNetCore.CAP.Internal | |||||
{ | { | ||||
private readonly CapOptions _capOptions; | private readonly CapOptions _capOptions; | ||||
private readonly IServiceProvider _serviceProvider; | private readonly IServiceProvider _serviceProvider; | ||||
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _asteriskList; | |||||
private List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>> _poundList; | |||||
/// <summary> | /// <summary> | ||||
/// Creates a new <see cref="DefaultConsumerServiceSelector" />. | /// Creates a new <see cref="DefaultConsumerServiceSelector" />. | ||||
@@ -29,17 +32,6 @@ namespace DotNetCore.CAP.Internal | |||||
_capOptions = capOptions; | _capOptions = capOptions; | ||||
} | } | ||||
/// <summary> | |||||
/// Selects the best <see cref="ConsumerExecutorDescriptor" /> candidate from <paramref name="executeDescriptor" /> for | |||||
/// the | |||||
/// current message associated. | |||||
/// </summary> | |||||
public ConsumerExecutorDescriptor SelectBestCandidate(string key, | |||||
IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||||
{ | |||||
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key); | |||||
} | |||||
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates() | public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates() | ||||
{ | { | ||||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | ||||
@@ -51,6 +43,26 @@ namespace DotNetCore.CAP.Internal | |||||
return executorDescriptorList; | return executorDescriptorList; | ||||
} | } | ||||
public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||||
{ | |||||
var result = MatchUsingName(key, executeDescriptor); | |||||
if (result != null) | |||||
{ | |||||
return result; | |||||
} | |||||
//[*] match with regex, i.e. foo.*.abc | |||||
result = MatchAsteriskUsingRegex(key, executeDescriptor); | |||||
if (result != null) | |||||
{ | |||||
return result; | |||||
} | |||||
//[#] match regex, i.e. foo.# | |||||
result = MatchPoundUsingRegex(key, executeDescriptor); | |||||
return result; | |||||
} | |||||
private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes( | private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes( | ||||
IServiceProvider provider) | IServiceProvider provider) | ||||
{ | { | ||||
@@ -130,5 +142,65 @@ namespace DotNetCore.CAP.Internal | |||||
return descriptor; | return descriptor; | ||||
} | } | ||||
private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||||
{ | |||||
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key); | |||||
} | |||||
private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||||
{ | |||||
if (_asteriskList == null) | |||||
{ | |||||
_asteriskList = executeDescriptor | |||||
.Where(x => x.Attribute.Name.IndexOf('*') >= 0) | |||||
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor> | |||||
{ | |||||
Name = ("^" + x.Attribute.Name + "$").Replace("*", "[a-zA-Z]+").Replace(".", "\\."), | |||||
Descriptor = x | |||||
}).ToList(); | |||||
} | |||||
foreach (var red in _asteriskList) | |||||
{ | |||||
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) | |||||
{ | |||||
return red.Descriptor; | |||||
} | |||||
} | |||||
return null; | |||||
} | |||||
private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor) | |||||
{ | |||||
if (_poundList == null) | |||||
{ | |||||
_poundList = executeDescriptor | |||||
.Where(x => x.Attribute.Name.IndexOf('#') >= 0) | |||||
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor> | |||||
{ | |||||
Name = ("^" + x.Attribute.Name + "$").Replace("#", "[a-zA-Z\\.]+"), | |||||
Descriptor = x | |||||
}).ToList(); | |||||
} | |||||
foreach (var red in _poundList) | |||||
{ | |||||
if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline)) | |||||
{ | |||||
return red.Descriptor; | |||||
} | |||||
} | |||||
return null; | |||||
} | |||||
private class RegexExecuteDescriptor<T> | |||||
{ | |||||
public string Name { get; set; } | |||||
public T Descriptor { get; set; } | |||||
} | |||||
} | } | ||||
} | } |
@@ -22,8 +22,6 @@ namespace DotNetCore.CAP.Internal | |||||
/// </summary> | /// </summary> | ||||
/// <param name="key">topic or exchange router key.</param> | /// <param name="key">topic or exchange router key.</param> | ||||
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param> | /// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param> | ||||
/// <returns></returns> | |||||
ConsumerExecutorDescriptor | |||||
SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates); | |||||
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates); | |||||
} | } | ||||
} | } |
@@ -11,21 +11,20 @@ namespace DotNetCore.CAP.Internal | |||||
internal class MethodMatcherCache | internal class MethodMatcherCache | ||||
{ | { | ||||
private readonly IConsumerServiceSelector _selector; | private readonly IConsumerServiceSelector _selector; | ||||
private List<string> _allTopics; | |||||
public MethodMatcherCache(IConsumerServiceSelector selector) | public MethodMatcherCache(IConsumerServiceSelector selector) | ||||
{ | { | ||||
_selector = selector; | _selector = selector; | ||||
Entries = new ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>>(); | |||||
Entries = new ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>>(); | |||||
} | } | ||||
private ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> Entries { get; } | |||||
private ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>> Entries { get; } | |||||
/// <summary> | /// <summary> | ||||
/// Get a dictionary of candidates.In the dictionary, | /// Get a dictionary of candidates.In the dictionary, | ||||
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates | /// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates | ||||
/// </summary> | /// </summary> | ||||
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped() | |||||
public ConcurrentDictionary<string, IReadOnlyList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped() | |||||
{ | { | ||||
if (Entries.Count != 0) | if (Entries.Count != 0) | ||||
{ | { | ||||
@@ -44,28 +43,6 @@ namespace DotNetCore.CAP.Internal | |||||
return Entries; | return Entries; | ||||
} | } | ||||
/// <summary> | |||||
/// Get a dictionary of specify topic candidates. | |||||
/// The Key is Group name, the value is specify topic candidates. | |||||
/// </summary> | |||||
/// <param name="topicName">message topic name</param> | |||||
public IDictionary<string, IList<ConsumerExecutorDescriptor>> GetTopicExector(string topicName) | |||||
{ | |||||
if (Entries == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(Entries)); | |||||
} | |||||
var dic = new Dictionary<string, IList<ConsumerExecutorDescriptor>>(); | |||||
foreach (var item in Entries) | |||||
{ | |||||
var topicCandidates = item.Value.Where(x => x.Attribute.Name == topicName); | |||||
dic.Add(item.Key, topicCandidates.ToList()); | |||||
} | |||||
return dic; | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// Attempts to get the topic exector associated with the specified topic name and group name from the | /// Attempts to get the topic exector associated with the specified topic name and group name from the | ||||
/// <see cref="Entries" />. | /// <see cref="Entries" />. | ||||
@@ -86,36 +63,12 @@ namespace DotNetCore.CAP.Internal | |||||
if (Entries.TryGetValue(groupName, out var groupMatchTopics)) | if (Entries.TryGetValue(groupName, out var groupMatchTopics)) | ||||
{ | { | ||||
matchTopic = groupMatchTopics.FirstOrDefault(x => x.Attribute.Name == topicName); | |||||
matchTopic = _selector.SelectBestCandidate(topicName, groupMatchTopics); | |||||
return matchTopic != null; | return matchTopic != null; | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
/// <summary> | |||||
/// Get all subscribe topics name. | |||||
/// </summary> | |||||
public IEnumerable<string> GetSubscribeTopics() | |||||
{ | |||||
if (_allTopics != null) | |||||
{ | |||||
return _allTopics; | |||||
} | |||||
if (Entries == null) | |||||
{ | |||||
throw new ArgumentNullException(nameof(Entries)); | |||||
} | |||||
_allTopics = new List<string>(); | |||||
foreach (var descriptors in Entries.Values) | |||||
{ | |||||
_allTopics.AddRange(descriptors.Select(x => x.Attribute.Name)); | |||||
} | |||||
return _allTopics; | |||||
} | |||||
} | } | ||||
} | } |
@@ -3,7 +3,7 @@ | |||||
namespace DotNetCore.CAP.Processor | namespace DotNetCore.CAP.Processor | ||||
{ | { | ||||
public interface IAdditionalProcessor : IProcessor | |||||
public interface ICollectProcessor : IProcessor | |||||
{ | { | ||||
} | } | ||||
} | } |
@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Processor | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
_logger.ExceptionOccuredWhileExecuting(message.Name, ex); | |||||
_logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Topic:{message.Name}, Id:{message.Id}"); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -82,14 +82,7 @@ namespace DotNetCore.CAP.Processor | |||||
{ | { | ||||
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) | foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) | ||||
{ | { | ||||
try | |||||
{ | |||||
_executor.ExecuteAsync(message); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
_logger.ExceptionOccuredWhileExecuting(message.Name, ex); | |||||
} | |||||
_executor.ExecuteAsync(message); | |||||
} | } | ||||
} | } | ||||
catch (OperationCanceledException) | catch (OperationCanceledException) | ||||
@@ -57,13 +57,14 @@ namespace DotNetCore.CAP.Processor | |||||
return; | return; | ||||
} | } | ||||
_disposed = true; | |||||
_logger.ServerShuttingDown(); | |||||
_cts.Cancel(); | |||||
try | try | ||||
{ | { | ||||
_compositeTask.Wait((int) TimeSpan.FromSeconds(10).TotalMilliseconds); | |||||
_disposed = true; | |||||
_logger.ServerShuttingDown(); | |||||
_cts.Cancel(); | |||||
_compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds); | |||||
} | } | ||||
catch (AggregateException ex) | catch (AggregateException ex) | ||||
{ | { | ||||
@@ -73,6 +74,14 @@ namespace DotNetCore.CAP.Processor | |||||
_logger.ExpectedOperationCanceledException(innerEx); | _logger.ExpectedOperationCanceledException(innerEx); | ||||
} | } | ||||
} | } | ||||
catch (Exception ex) | |||||
{ | |||||
_logger.LogWarning(ex, "An exception was occured when disposing."); | |||||
} | |||||
finally | |||||
{ | |||||
_logger.LogInformation("### CAP shutdown!"); | |||||
} | |||||
} | } | ||||
private IProcessor InfiniteRetry(IProcessor inner) | private IProcessor InfiniteRetry(IProcessor inner) | ||||
@@ -85,7 +94,7 @@ namespace DotNetCore.CAP.Processor | |||||
var returnedProcessors = new List<IProcessor> | var returnedProcessors = new List<IProcessor> | ||||
{ | { | ||||
_provider.GetRequiredService<NeedRetryMessageProcessor>(), | _provider.GetRequiredService<NeedRetryMessageProcessor>(), | ||||
_provider.GetRequiredService<IAdditionalProcessor>() | |||||
_provider.GetRequiredService<ICollectProcessor>() | |||||
}; | }; | ||||
return returnedProcessors.ToArray(); | return returnedProcessors.ToArray(); | ||||
@@ -1,11 +1,8 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | |||||
<AssemblyName>DotNetCore.CAP.MySql.Test</AssemblyName> | |||||
<PackageId>DotNetCore.CAP.MySql.Test</PackageId> | |||||
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -14,20 +11,20 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> | |||||
<PackageReference Include="MySqlConnector" Version="0.38.0" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" /> | |||||
<PackageReference Include="MySqlConnector" Version="0.40.4" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | ||||
<PackageReference Include="xunit" Version="2.3.1" /> | <PackageReference Include="xunit" Version="2.3.1" /> | ||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> | |||||
<PackageReference Include="Moq" Version="4.8.2" /> | <PackageReference Include="Moq" Version="4.8.2" /> | ||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
</Project> | </Project> |
@@ -1,15 +1,14 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | <IsPackable>false</IsPackable> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> | |||||
<PackageReference Include="Npgsql" Version="3.2.7" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" /> | |||||
<PackageReference Include="Npgsql" Version="4.0.0" /> | |||||
<PackageReference Include="xunit" Version="2.3.1" /> | <PackageReference Include="xunit" Version="2.3.1" /> | ||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -1,7 +1,7 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | <IsPackable>false</IsPackable> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -11,20 +11,20 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.50.4" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> | |||||
<PackageReference Include="System.Data.SqlClient" Version="4.4.3" /> | |||||
<PackageReference Include="Dapper" Version="1.50.5" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" /> | |||||
<PackageReference Include="System.Data.SqlClient" Version="4.5.0" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | ||||
<PackageReference Include="xunit" Version="2.3.1" /> | <PackageReference Include="xunit" Version="2.3.1" /> | ||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> | |||||
<PackageReference Include="Moq" Version="4.8.2" /> | <PackageReference Include="Moq" Version="4.8.2" /> | ||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
</Project> | </Project> |
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.Test | |||||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | ||||
var candidates = selector.SelectCandidates(); | var candidates = selector.SelectCandidates(); | ||||
Assert.Equal(2, candidates.Count); | |||||
Assert.Equal(6, candidates.Count); | |||||
} | } | ||||
[Fact] | [Fact] | ||||
@@ -42,6 +42,66 @@ namespace DotNetCore.CAP.Test | |||||
Assert.NotNull(bestCandidates.MethodInfo); | Assert.NotNull(bestCandidates.MethodInfo); | ||||
Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType); | Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType); | ||||
} | } | ||||
[Theory] | |||||
[InlineData("Candidates.Asterisk")] | |||||
[InlineData("candidates.Asterisk")] | |||||
[InlineData("AAA.BBB.Asterisk")] | |||||
[InlineData("aaa.bbb.Asterisk")] | |||||
public void CanFindAsteriskTopic(string topic) | |||||
{ | |||||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||||
var candidates = selector.SelectCandidates(); | |||||
var bestCandidates = selector.SelectBestCandidate(topic, candidates); | |||||
Assert.NotNull(bestCandidates); | |||||
} | |||||
[Theory] | |||||
[InlineData("Candidates.Asterisk.AAA")] | |||||
[InlineData("AAA.BBB.CCC.Asterisk")] | |||||
[InlineData("aaa.BBB.ccc.Asterisk")] | |||||
[InlineData("Asterisk.aaa.bbb")] | |||||
public void CanNotFindAsteriskTopic(string topic) | |||||
{ | |||||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||||
var candidates = selector.SelectCandidates(); | |||||
var bestCandidates = selector.SelectBestCandidate(topic, candidates); | |||||
Assert.Null(bestCandidates); | |||||
} | |||||
[Theory] | |||||
[InlineData("Candidates.Pound.AAA")] | |||||
[InlineData("Candidates.Pound.AAA.BBB")] | |||||
[InlineData("AAA.Pound")] | |||||
[InlineData("aaa.Pound")] | |||||
[InlineData("aaa.bbb.Pound")] | |||||
[InlineData("aaa.BBB.Pound")] | |||||
public void CanFindPoundTopic(string topic) | |||||
{ | |||||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||||
var candidates = selector.SelectCandidates(); | |||||
var bestCandidates = selector.SelectBestCandidate(topic, candidates); | |||||
Assert.NotNull(bestCandidates); | |||||
} | |||||
[Theory] | |||||
[InlineData("Pound")] | |||||
[InlineData("aaa.Pound.AAA.BBB")] | |||||
[InlineData("Pound.AAA")] | |||||
[InlineData("Pound.aaa")] | |||||
[InlineData("AAA.Pound.aaa")] | |||||
public void CanNotFindPoundTopic(string topic) | |||||
{ | |||||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||||
var candidates = selector.SelectCandidates(); | |||||
var bestCandidates = selector.SelectBestCandidate(topic, candidates); | |||||
Assert.Null(bestCandidates); | |||||
} | |||||
} | } | ||||
public class CandidatesTopic : TopicAttribute | public class CandidatesTopic : TopicAttribute | ||||
@@ -73,6 +133,21 @@ namespace DotNetCore.CAP.Test | |||||
{ | { | ||||
Console.WriteLine("GetFoo2() method has bee excuted."); | Console.WriteLine("GetFoo2() method has bee excuted."); | ||||
} | } | ||||
[CandidatesTopic("*.*.Asterisk")] | |||||
[CandidatesTopic("*.Asterisk")] | |||||
public void GetFooAsterisk() | |||||
{ | |||||
Console.WriteLine("GetFoo2Asterisk() method has bee excuted."); | |||||
} | |||||
[CandidatesTopic("Candidates.Pound.#")] | |||||
[CandidatesTopic("#.Pound")] | |||||
public void GetFooPound() | |||||
{ | |||||
Console.WriteLine("GetFoo2Pound() method has bee excuted."); | |||||
} | |||||
} | } | ||||
public class CandidatesBarTest : IBarTest | public class CandidatesBarTest : IBarTest | ||||
@@ -0,0 +1,241 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.Reflection; | |||||
using System.Runtime.CompilerServices; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
using DotNetCore.CAP.Internal; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.Test | |||||
{ | |||||
public class DiagnosticsTest | |||||
{ | |||||
private static readonly DiagnosticListener s_diagnosticListener = | |||||
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); | |||||
[Fact] | |||||
public void WritePublishBeforeTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerPublishEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow); | |||||
s_diagnosticListener.WritePublishBefore(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforePublish)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerPublishEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerPublishEventData)kvp.Value).OperationId); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WritePublishAfterTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerPublishEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1)); | |||||
s_diagnosticListener.WritePublishAfter(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterPublish)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerPublishEndEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerPublishEndEventData)kvp.Value).OperationId); | |||||
Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerPublishEndEventData)kvp.Value).Duration); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WritePublishErrorTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
var ex = new Exception("WritePublishErrorTest"); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerPublishErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan)); | |||||
s_diagnosticListener.WritePublishError(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerPublishErrorEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerPublishErrorEventData)kvp.Value).OperationId); | |||||
Assert.Equal(ex, ((BrokerPublishErrorEventData)kvp.Value).Exception); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteConsumeBeforeTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerConsumeEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow); | |||||
s_diagnosticListener.WriteConsumeBefore(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeConsume)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerConsumeEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerConsumeEventData)kvp.Value).OperationId); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteConsumeAfterTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerConsumeEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1)); | |||||
s_diagnosticListener.WriteConsumeAfter(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterConsume)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerConsumeEndEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerConsumeEndEventData)kvp.Value).OperationId); | |||||
Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerConsumeEndEventData)kvp.Value).Duration); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteConsumeErrorTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
var ex = new Exception("WriteConsumeErrorTest"); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
var eventData = new BrokerConsumeErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan)); | |||||
s_diagnosticListener.WriteConsumeError(eventData); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<BrokerConsumeErrorEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((BrokerConsumeErrorEventData)kvp.Value).OperationId); | |||||
Assert.Equal(ex, ((BrokerConsumeErrorEventData)kvp.Value).Exception); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteSubscriberInvokeBeforeTest() | |||||
{ | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
s_diagnosticListener.WriteSubscriberInvokeBefore(FackConsumerContext()); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeSubscriberInvoke)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<SubscriberInvokeEventData>(kvp.Value); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteSubscriberInvokeAfterTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, FackConsumerContext(), DateTimeOffset.Now, TimeSpan.FromMinutes(1)); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterSubscriberInvoke)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<SubscriberInvokeEndEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((SubscriberInvokeEndEventData)kvp.Value).OperationId); | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void WriteSubscriberInvokeErrorTest() | |||||
{ | |||||
Guid operationId = Guid.NewGuid(); | |||||
var ex = new Exception("WriteConsumeErrorTest"); | |||||
DiagnosticsWapper(() => | |||||
{ | |||||
s_diagnosticListener.WriteSubscriberInvokeError(operationId, FackConsumerContext(), ex, | |||||
DateTimeOffset.Now, TimeSpan.MaxValue); | |||||
}, kvp => | |||||
{ | |||||
if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorSubscriberInvoke)) | |||||
{ | |||||
Assert.NotNull(kvp.Value); | |||||
Assert.IsType<SubscriberInvokeErrorEventData>(kvp.Value); | |||||
Assert.Equal(operationId, ((SubscriberInvokeErrorEventData)kvp.Value).OperationId); | |||||
Assert.Equal(ex, ((SubscriberInvokeErrorEventData)kvp.Value).Exception); | |||||
} | |||||
}); | |||||
} | |||||
private ConsumerContext FackConsumerContext() | |||||
{ | |||||
//Mock description | |||||
var description = new ConsumerExecutorDescriptor | |||||
{ | |||||
MethodInfo = GetType().GetMethod("WriteSubscriberInvokeAfterTest"), | |||||
Attribute = new CapSubscribeAttribute("xxx"), | |||||
ImplTypeInfo = GetType().GetTypeInfo() | |||||
}; | |||||
//Mock messageContext | |||||
var messageContext = new MessageContext | |||||
{ | |||||
Name= "Name", | |||||
Group= "Group", | |||||
Content = "Content" | |||||
}; | |||||
return new ConsumerContext(description, messageContext); | |||||
} | |||||
private void DiagnosticsWapper(Action operation, Action<KeyValuePair<string, object>> assert, [CallerMemberName]string methodName = "") | |||||
{ | |||||
FakeDiagnosticListenerObserver diagnosticListenerObserver = new FakeDiagnosticListenerObserver(assert); | |||||
diagnosticListenerObserver.Enable(); | |||||
using (DiagnosticListener.AllListeners.Subscribe(diagnosticListenerObserver)) | |||||
{ | |||||
Console.WriteLine(string.Format("Test: {0} Enabled Listeners", methodName)); | |||||
operation(); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -1,21 +1,19 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors> | |||||
<AssemblyName>DotNetCore.CAP.Test</AssemblyName> | |||||
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles> | |||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.6.2" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.2" /> | |||||
<PackageReference Include="System.Data.Common" Version="4.3.0" /> | <PackageReference Include="System.Data.Common" Version="4.3.0" /> | ||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | <PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" /> | ||||
<PackageReference Include="xunit" Version="2.3.1" /> | <PackageReference Include="xunit" Version="2.3.1" /> | ||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.0.0" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.0" /> | |||||
<PackageReference Include="Moq" Version="4.8.2" /> | <PackageReference Include="Moq" Version="4.8.2" /> | ||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -0,0 +1,71 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.Text; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
namespace DotNetCore.CAP.Test | |||||
{ | |||||
public sealed class FakeDiagnosticListenerObserver : IObserver<DiagnosticListener> | |||||
{ | |||||
private class FakeDiagnosticSourceWriteObserver : IObserver<KeyValuePair<string, object>> | |||||
{ | |||||
private readonly Action<KeyValuePair<string, object>> _writeCallback; | |||||
public FakeDiagnosticSourceWriteObserver(Action<KeyValuePair<string, object>> writeCallback) | |||||
{ | |||||
_writeCallback = writeCallback; | |||||
} | |||||
public void OnCompleted() | |||||
{ | |||||
} | |||||
public void OnError(Exception error) | |||||
{ | |||||
} | |||||
public void OnNext(KeyValuePair<string, object> value) | |||||
{ | |||||
_writeCallback(value); | |||||
} | |||||
} | |||||
private readonly Action<KeyValuePair<string, object>> _writeCallback; | |||||
private bool _writeObserverEnabled; | |||||
public FakeDiagnosticListenerObserver(Action<KeyValuePair<string, object>> writeCallback) | |||||
{ | |||||
_writeCallback = writeCallback; | |||||
} | |||||
public void OnCompleted() | |||||
{ | |||||
} | |||||
public void OnError(Exception error) | |||||
{ | |||||
} | |||||
public void OnNext(DiagnosticListener value) | |||||
{ | |||||
if (value.Name.Equals(CapDiagnosticListenerExtensions.DiagnosticListenerName)) | |||||
{ | |||||
value.Subscribe(new FakeDiagnosticSourceWriteObserver(_writeCallback), IsEnabled); | |||||
} | |||||
} | |||||
public void Enable() | |||||
{ | |||||
_writeObserverEnabled = true; | |||||
} | |||||
public void Disable() | |||||
{ | |||||
_writeObserverEnabled = false; | |||||
} | |||||
private bool IsEnabled(string s) | |||||
{ | |||||
return _writeObserverEnabled; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,154 @@ | |||||
using System; | |||||
using System.Reflection; | |||||
using DotNetCore.CAP.Diagnostics; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using Newtonsoft.Json.Linq; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.Test | |||||
{ | |||||
public class HelperTest | |||||
{ | |||||
[Fact] | |||||
public void ToTimestampTest() | |||||
{ | |||||
//Arrange | |||||
var time = DateTime.Parse("2018-01-01 00:00:00"); | |||||
//Act | |||||
var result = Helper.ToTimestamp(time); | |||||
//Assert | |||||
Assert.Equal(1514764800, result); | |||||
} | |||||
[Fact] | |||||
public void FromTimestampTest() | |||||
{ | |||||
//Arrange | |||||
var time = DateTime.Parse("2018-01-01 00:00:00"); | |||||
//Act | |||||
var result = Helper.FromTimestamp(1514764800); | |||||
//Assert | |||||
Assert.Equal(time, result); | |||||
} | |||||
[Fact] | |||||
public void IsControllerTest() | |||||
{ | |||||
//Arrange | |||||
var typeInfo = typeof(HomeController).GetTypeInfo(); | |||||
//Act | |||||
var result = Helper.IsController(typeInfo); | |||||
//Assert | |||||
Assert.True(result); | |||||
} | |||||
[Theory] | |||||
[InlineData(typeof(string))] | |||||
[InlineData(typeof(decimal))] | |||||
[InlineData(typeof(DateTime))] | |||||
[InlineData(typeof(DateTimeOffset))] | |||||
[InlineData(typeof(Guid))] | |||||
[InlineData(typeof(TimeSpan))] | |||||
[InlineData(typeof(Uri))] | |||||
public void IsSimpleTypeTest(Type type) | |||||
{ | |||||
//Act | |||||
var result = Helper.IsComplexType(type); | |||||
//Assert | |||||
Assert.False(result); | |||||
} | |||||
[Theory] | |||||
[InlineData(typeof(HomeController))] | |||||
[InlineData(typeof(Exception))] | |||||
[InlineData(typeof(Person))] | |||||
public void IsComplexTypeTest(Type type) | |||||
{ | |||||
//Act | |||||
var result = Helper.IsComplexType(type); | |||||
//Assert | |||||
Assert.True(result); | |||||
} | |||||
[Fact] | |||||
public void AddExceptionPropertyTest() | |||||
{ | |||||
//Arrange | |||||
var json = "{}"; | |||||
var exception = new Exception("Test Exception Message") | |||||
{ | |||||
Source = "Test Source", | |||||
InnerException = { } | |||||
}; | |||||
var expected = new | |||||
{ | |||||
ExceptionMessage = new | |||||
{ | |||||
Source = "Test Source", | |||||
Message = "Test Exception Message", | |||||
InnerMessage = new { } | |||||
} | |||||
}; | |||||
//Act | |||||
var result = Helper.AddExceptionProperty(json, exception); | |||||
//Assert | |||||
var jObj = JObject.Parse(result); | |||||
Assert.Equal(jObj["ExceptionMessage"]["Source"].Value<string>(), expected.ExceptionMessage.Source); | |||||
Assert.Equal(jObj["ExceptionMessage"]["Message"].Value<string>(), expected.ExceptionMessage.Message); | |||||
} | |||||
[Theory] | |||||
[InlineData("10.0.0.1")] | |||||
[InlineData("172.16.0.1")] | |||||
[InlineData("192.168.1.1")] | |||||
public void IsInnerIPTest(string ipAddress) | |||||
{ | |||||
Assert.True(Helper.IsInnerIP(ipAddress)); | |||||
} | |||||
[Fact] | |||||
public void AddTracingHeaderPropertyTest() | |||||
{ | |||||
//Arrange | |||||
var json = "{}"; | |||||
var header = new TracingHeaders { { "key", "value" } }; | |||||
//Act | |||||
var result = Helper.AddTracingHeaderProperty(json, header); | |||||
//Assert | |||||
var expected = "{\"TracingHeaders\":{\"key\":\"value\"}}"; | |||||
Assert.Equal(expected, result); | |||||
} | |||||
[Fact] | |||||
public void TryExtractTracingHeadersTest() | |||||
{ | |||||
//Arrange | |||||
var json = "{\"TracingHeaders\":{\"key\":\"value\"}}"; | |||||
TracingHeaders header = null; | |||||
string removedHeadersJson = ""; | |||||
//Act | |||||
var result = Helper.TryExtractTracingHeaders(json, out header, out removedHeadersJson); | |||||
//Assert | |||||
Assert.True(result); | |||||
Assert.NotNull(header); | |||||
Assert.Single(header); | |||||
Assert.Equal("{}", removedHeadersJson); | |||||
} | |||||
} | |||||
} |
@@ -1,83 +0,0 @@ | |||||
//using System; | |||||
//using System.Threading; | |||||
//using System.Threading.Tasks; | |||||
//using DotNetCore.CAP.Models; | |||||
//using DotNetCore.CAP.Processor; | |||||
//using Microsoft.Extensions.DependencyInjection; | |||||
//using Microsoft.Extensions.Options; | |||||
//using Moq; | |||||
//using Xunit; | |||||
//namespace DotNetCore.CAP.Test | |||||
//{ | |||||
// public class DefaultDispatcherTest | |||||
// { | |||||
// private CancellationTokenSource _cancellationTokenSource; | |||||
// private ProcessingContext _context; | |||||
// private IServiceProvider _provider; | |||||
// private Mock<IStorageConnection> _mockStorageConnection; | |||||
// public DefaultDispatcherTest() | |||||
// { | |||||
// _mockStorageConnection = new Mock<IStorageConnection>(); | |||||
// _cancellationTokenSource = new CancellationTokenSource(); | |||||
// var services = new ServiceCollection(); | |||||
// services.AddLogging(); | |||||
// services.Configure<IOptions<CapOptions>>(x => { }); | |||||
// services.AddOptions(); | |||||
// services.AddSingleton(_mockStorageConnection.Object); | |||||
// _provider = services.BuildServiceProvider(); | |||||
// _context = new ProcessingContext(_provider, _cancellationTokenSource.Token); | |||||
// } | |||||
// [Fact] | |||||
// public void MockTest() | |||||
// { | |||||
// Assert.NotNull(_provider.GetServices<IStorageConnection>()); | |||||
// } | |||||
// [Fact] | |||||
// public async void ProcessAsync_CancellationTokenCancelled_ThrowsImmediately() | |||||
// { | |||||
// // Arrange | |||||
// _cancellationTokenSource.Cancel(); | |||||
// var fixture = Create(); | |||||
// // Act | |||||
// await Assert.ThrowsAsync<OperationCanceledException>(() => fixture.ProcessAsync(_context)); | |||||
// } | |||||
// [Fact] | |||||
// public async Task ProcessAsync() | |||||
// { | |||||
// // Arrange | |||||
// var job = new CapPublishedMessage | |||||
// { | |||||
// }; | |||||
// var mockFetchedJob = Mock.Get(Mock.Of<IFetchedMessage>(fj => fj.MessageId == 42 && fj.MessageType == MessageType.Publish)); | |||||
// _mockStorageConnection | |||||
// .Setup(m => m.FetchNextMessageAsync()) | |||||
// .ReturnsAsync(mockFetchedJob.Object).Verifiable(); | |||||
// _mockQueueExecutor | |||||
// .Setup(x => x.ExecuteAsync(_mockStorageConnection.Object, mockFetchedJob.Object)) | |||||
// .Returns(Task.FromResult(OperateResult.Success)); | |||||
// var fixture = Create(); | |||||
// // Act | |||||
// await fixture.ProcessAsync(_context); | |||||
// // Assert | |||||
// _mockStorageConnection.VerifyAll(); | |||||
// } | |||||
// private DefaultDispatcher Create() | |||||
// => _provider.GetService<DefaultDispatcher>(); | |||||
// } | |||||
//} |