diff --git a/.github/ISSUE_TEMPLATE b/.github/ISSUE_TEMPLATE
new file mode 100644
index 0000000..66cdc24
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE
@@ -0,0 +1,40 @@
+
+
+Please answer these questions before submitting your issue.
+
+- Why do you submit this issue?
+- [ ] Question or discussion
+- [ ] Bug
+- [ ] Requirement
+- [ ] Feature or performance improvement
+
+___
+### Question
+- What do you want to know?
+
+___
+### Bug
+- Which version of CAP, OS and .NET Core?
+
+- Which company or project?
+
+- What happen?
+If possible, provide a way for reproducing the error. e.g. demo application, component version.
+
+___
+### Requirement or improvement
+- Please describe about your requirements or improvement suggestions.
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 4d8bd00..ba8c1fe 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,9 @@
-language: cpp
+language: csharp
sudo: required
dist: trusty
+solution: CAP.sln
+dotnet: 2.1.300
+mono: none
matrix:
include:
@@ -10,33 +13,9 @@ matrix:
- os: osx
osx_image: xcode8.3 # macOS 10.12
-env:
- global:
- - DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true
- - DOTNET_CLI_TELEMETRY_OPTOUT: 1
- - CLI_VERSION=2.0.0
-
-addons:
- apt:
- packages:
- - gettext
- - libcurl4-openssl-dev
- - libicu-dev
- - libssl-dev
- - libunwind8
- - zlib1g
-
-# Make sure build dependencies are installed.
-before_install:
- - if test "$TRAVIS_OS_NAME" == "osx"; then ln -s /usr/local/opt/openssl/lib/libcrypto.1.0.0.dylib /usr/local/lib/; ln -s /usr/local/opt/openssl/lib/libssl.1.0.0.dylib /usr/local/lib/; fi
- - export DOTNET_INSTALL_DIR="$PWD/.dotnetcli"
- - export PATH="$DOTNET_INSTALL_DIR:$PATH"
-
-install:
- - travis_retry curl -sSL https://dot.net/v1/dotnet-install.sh | bash /dev/stdin --channel 2.0 --version "$CLI_VERSION" --install-dir "$DOTNET_INSTALL_DIR"
-
# Run the build script
script:
- dotnet --info
- - dotnet restore
- - dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj -f netcoreapp2.0
+ - dotnet restore CAP.sln
+ - dotnet build CAP.sln
+ - dotnet test test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
diff --git a/CAP.sln b/CAP.sln
index 952416d..225537c 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
CHANGELOG.md = CHANGELOG.md
CODE_OF_CONDUCT.md = CODE_OF_CONDUCT.md
ConfigureMSDTC.ps1 = ConfigureMSDTC.ps1
+ .github\ISSUE_TEMPLATE = .github\ISSUE_TEMPLATE
LICENSE.txt = LICENSE.txt
README.md = README.md
README.zh-cn.md = README.zh-cn.md
@@ -35,7 +36,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{10C0818D
build.cake = build.cake
build.ps1 = build.ps1
build.sh = build.sh
- build\common.props = build\common.props
build\index.cake = build\index.cake
build\util.cake = build\util.cake
build\version.cake = build\version.cake
diff --git a/Directory.Build.props b/Directory.Build.props
new file mode 100644
index 0000000..c4ae862
--- /dev/null
+++ b/Directory.Build.props
@@ -0,0 +1,18 @@
+
+
+
+
+
+ CAP
+ .NET Core Community;Savorboard
+ https://github.com/dotnetcore/CAP
+ git
+ $(MSBuildThisFileDirectory)
+ https://avatars2.githubusercontent.com/u/19404084
+ https://github.com/dotnetcore/CAP
+ https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt
+ CAP;EventBus;Distributed Transaction
+ EventBus and eventually consistency in distributed architectures.
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index cd25182..4dbadde 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
-[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
+[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently.
diff --git a/README.zh-cn.md b/README.zh-cn.md
index 8ceb14b..639e4c5 100644
--- a/README.zh-cn.md
+++ b/README.zh-cn.md
@@ -3,7 +3,7 @@
[![AppVeyor](https://ci.appveyor.com/api/projects/status/4mpe0tbu7n126vyw?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
[![NuGet Preview](https://img.shields.io/nuget/vpre/DotNetCore.CAP.svg?label=nuget-pre)](https://www.nuget.org/packages/DotNetCore.CAP/)
-[![Member project of .NET China Foundation](https://img.shields.io/badge/member_project_of-.NET_CHINA-red.svg?style=flat&colorB=9E20C8)](https://github.com/dotnetcore)
+[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
diff --git a/build/common.props b/build/common.props
deleted file mode 100644
index 80c2d79..0000000
--- a/build/common.props
+++ /dev/null
@@ -1,17 +0,0 @@
-
-
-
-
-
- CAP
- savorboard;dotnetcore
- https://github.com/dotnetcore/CAP
- git
- https://avatars2.githubusercontent.com/u/19404084
- https://github.com/dotnetcore/CAP
- https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt
- CAP;EventBus;Distributed Transaction
- EventBus and eventually consistency in distributed architectures.
-
-
-
diff --git a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
index 6f04946..21510bb 100644
--- a/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
+++ b/samples/Sample.Kafka.MySql/Controllers/ValuesController.cs
@@ -34,7 +34,7 @@ namespace Sample.Kafka.MySql.Controllers
return Ok("publish successful!");
}
- [CapSubscribe("xxx.xxx.test2")]
+ [CapSubscribe("#.test2")]
public void Test2(int value)
{
Console.WriteLine("Subscriber output message: " + value);
diff --git a/samples/Sample.Kafka.MySql/Startup.cs b/samples/Sample.Kafka.MySql/Startup.cs
index 419e48e..59c4636 100644
--- a/samples/Sample.Kafka.MySql/Startup.cs
+++ b/samples/Sample.Kafka.MySql/Startup.cs
@@ -10,8 +10,8 @@ namespace Sample.Kafka.MySql
{
services.AddCap(x =>
{
- x.UseMySql("Server=192.168.10.110;Database=testcap;UserId=root;Password=123123;");
- x.UseKafka("192.168.10.110:9092");
+ x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;");
+ x.UseKafka("localhost:9092");
x.UseDashboard();
});
diff --git a/samples/Sample.Kafka.MySql/appsettings.json b/samples/Sample.Kafka.MySql/appsettings.json
new file mode 100644
index 0000000..20aa907
--- /dev/null
+++ b/samples/Sample.Kafka.MySql/appsettings.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "IncludeScopes": false,
+ "LogLevel": {
+ "Default": "Debug"
+ }
+ }
+}
diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
index 1b47e5e..03f33c0 100644
--- a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
+++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
@@ -25,7 +25,6 @@ namespace Sample.RabbitMQ.MySql.Controllers
return Ok();
}
-
[Route("~/publish2")]
public IActionResult PublishMessage2()
{
@@ -47,7 +46,7 @@ namespace Sample.RabbitMQ.MySql.Controllers
}
[NonAction]
- [CapSubscribe("sample.rabbitmq.mysql")]
+ [CapSubscribe("#.rabbitmq.mysql")]
public void ReceiveMessage(DateTime time)
{
Console.WriteLine("[sample.rabbitmq.mysql] message received: " + DateTime.Now + ",sent time: " + time);
diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs
index 6525770..554a624 100644
--- a/samples/Sample.RabbitMQ.MySql/Startup.cs
+++ b/samples/Sample.RabbitMQ.MySql/Startup.cs
@@ -23,9 +23,6 @@ namespace Sample.RabbitMQ.MySql
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
- loggerFactory.AddConsole();
- loggerFactory.AddDebug();
-
app.UseMvc();
app.UseCap();
diff --git a/samples/Sample.RabbitMQ.MySql/appsettings.json b/samples/Sample.RabbitMQ.MySql/appsettings.json
new file mode 100644
index 0000000..20aa907
--- /dev/null
+++ b/samples/Sample.RabbitMQ.MySql/appsettings.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "IncludeScopes": false,
+ "LogLevel": {
+ "Default": "Debug"
+ }
+ }
+}
diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
index 72e580e..8c8d5e7 100644
--- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
+++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
@@ -1,7 +1,5 @@
-
-
netstandard2.0
DotNetCore.CAP.Kafka
diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
index 1993661..21b8398 100644
--- a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
+++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
@@ -6,23 +6,29 @@ using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka
{
public class ConnectionPool : IConnectionPool, IDisposable
{
+ private readonly ILogger _logger;
private readonly Func _activator;
- private readonly ConcurrentQueue _pool = new ConcurrentQueue();
+ private readonly ConcurrentQueue _pool;
private int _count;
-
private int _maxSize;
- public ConnectionPool(KafkaOptions options)
+ public ConnectionPool(ILogger logger, KafkaOptions options)
{
+ _logger = logger;
+ _pool = new ConcurrentQueue();
_maxSize = options.ConnectionPoolSize;
_activator = CreateActivator(options);
-
ServersAddress = options.Servers;
+
+ _logger.LogDebug("Kafka configuration of CAP :\r\n {0}",
+ JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
}
public string ServersAddress { get; }
diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
index 0bf0c2b..0c58540 100644
--- a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddScoped();
services.AddScoped();
- services.AddTransient();
+ services.AddTransient();
AddSingletionMySqlOptions(services);
}
diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
index 38482c6..14734ad 100644
--- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
+++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
@@ -1,7 +1,5 @@
-
-
netstandard2.0
DotNetCore.CAP.MySql
@@ -14,10 +12,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
similarity index 88%
rename from src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs
rename to src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
index fd892cb..4cd0a23 100644
--- a/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs
+++ b/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
@@ -10,7 +10,7 @@ using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
- internal class DefaultAdditionalProcessor : IAdditionalProcessor
+ internal class MySqlCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
@@ -18,7 +18,7 @@ namespace DotNetCore.CAP.MySql
private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
- public DefaultAdditionalProcessor(ILogger logger,
+ public MySqlCollectProcessor(ILogger logger,
MySqlOptions mysqlOptions)
{
_logger = logger;
@@ -27,8 +27,6 @@ namespace DotNetCore.CAP.MySql
public async Task ProcessAsync(ProcessingContext context)
{
- _logger.LogDebug("Collecting expired entities.");
-
var tables = new[]
{
$"{_options.TableNamePrefix}.published",
@@ -37,6 +35,8 @@ namespace DotNetCore.CAP.MySql
foreach (var table in tables)
{
+ _logger.LogDebug($"Collecting expired data from table [{table}].");
+
int removedCount;
do
{
diff --git a/src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs b/src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs
index 6fd0888..a8a4bdf 100644
--- a/src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlMonitoringApi.cs
@@ -126,7 +126,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
{
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
- var count = connection.ExecuteScalar(sqlQuery, new {state = statusName});
+ var count = connection.ExecuteScalar(sqlQuery, new { state = statusName });
return count;
}
@@ -167,10 +167,10 @@ select aggr.* from (
group by date_format(`Added`,'%Y-%m-%d-%H')
) aggr where `Key` in @keys;";
- var valuesMap = connection.Query(
+ var valuesMap = connection.Query(
sqlQuery,
- new {keys = keyMaps.Keys, statusName})
- .ToDictionary(x => (string) x.Key, x => (int) x.Count);
+ new { keys = keyMaps.Keys, statusName })
+ .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
{
diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
index 62af675..edc1729 100644
--- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddScoped();
services.AddScoped();
- services.AddTransient();
+ services.AddTransient();
AddSingletonPostgreSqlOptions(services);
}
diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
index 81b1b5c..0e3698f 100644
--- a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
+++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
@@ -1,7 +1,5 @@
-
-
netstandard2.0
DotNetCore.CAP.PostgreSql
@@ -14,10 +12,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
similarity index 87%
rename from src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs
rename to src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
index ef606eb..d82f0b8 100644
--- a/src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs
+++ b/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
@@ -10,7 +10,7 @@ using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
- internal class DefaultAdditionalProcessor : IAdditionalProcessor
+ internal class PostgreSqlCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.PostgreSql
private readonly PostgreSqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
- public DefaultAdditionalProcessor(ILogger logger,
+ public PostgreSqlCollectProcessor(ILogger logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.PostgreSql
public async Task ProcessAsync(ProcessingContext context)
{
- _logger.LogDebug("Collecting expired entities.");
-
foreach (var table in Tables)
{
+ _logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
+
var removedCount = 0;
do
{
diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs
index 8fcf9f8..2c73bff 100644
--- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs
+++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs
@@ -128,7 +128,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed'
var sqlQuery =
$"select count(\"Id\") from \"{_options.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)";
- var count = connection.ExecuteScalar(sqlQuery, new {state = statusName});
+ var count = connection.ExecuteScalar(sqlQuery, new { state = statusName });
return count;
}
@@ -170,9 +170,9 @@ with aggr as (
)
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);";
- var valuesMap = connection.Query(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName})
+ var valuesMap = connection.Query(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName })
.ToList()
- .ToDictionary(x => (string) x.Key, x => (int) x.Count);
+ .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
{
diff --git a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
index 6a68173..deccfb7 100644
--- a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
+++ b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
@@ -1,7 +1,5 @@
-
-
netstandard2.0
DotNetCore.CAP.RabbitMQ
diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
index e650518..e15fdfa 100644
--- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
@@ -6,6 +6,7 @@ using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
@@ -15,21 +16,24 @@ namespace DotNetCore.CAP.RabbitMQ
private const int DefaultPoolSize = 15;
private readonly Func _connectionActivator;
private readonly ILogger _logger;
- private readonly ConcurrentQueue _pool = new ConcurrentQueue();
+ private readonly ConcurrentQueue _pool;
private IConnection _connection;
private int _count;
private int _maxSize;
- public ConnectionChannelPool(ILogger logger,
- RabbitMQOptions options)
+ public ConnectionChannelPool(ILogger logger, RabbitMQOptions options)
{
_logger = logger;
_maxSize = DefaultPoolSize;
-
+ _pool = new ConcurrentQueue();
_connectionActivator = CreateConnection(options);
+
HostAddress = options.HostName + ":" + options.Port;
Exchange = options.ExchangeName;
+
+ _logger.LogDebug("RabbitMQ configuration of CAP :\r\n {0}",
+ JsonConvert.SerializeObject(options, Formatting.Indented));
}
IModel IConnectionChannelPool.Rent()
@@ -87,7 +91,7 @@ namespace DotNetCore.CAP.RabbitMQ
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
- _logger.LogWarning($"RabbitMQ client connection closed! {e}");
+ _logger.LogWarning($"RabbitMQ client connection closed! --> {e.ReplyText}");
}
public virtual IModel Rent()
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
index a34054f..972b6dd 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
@@ -93,7 +93,7 @@ namespace DotNetCore.CAP.RabbitMQ
_connection = _connectionChannelPool.GetConnection();
_channel = _connection.CreateModel();
-
+
_channel.ExchangeDeclare(
_exchageName,
RabbitMQOptions.ExchangeType,
@@ -155,7 +155,7 @@ namespace DotNetCore.CAP.RabbitMQ
var args = new LogMessageEventArgs
{
LogType = MqLogType.ConsumerShutdown,
- Reason = e.ToString()
+ Reason = e.ReplyText
};
OnLog?.Invoke(sender, args);
}
diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
index d536647..91e04f7 100644
--- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
@@ -26,7 +26,7 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddScoped();
services.AddScoped();
- services.AddTransient();
+ services.AddTransient();
AddSqlServerOptions(services);
}
diff --git a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
index 7d03a24..27d3ca2 100644
--- a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
+++ b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
@@ -1,11 +1,10 @@
-
-
netstandard2.0
DotNetCore.CAP.SqlServer
$(PackageTags);SQL Server
+
@@ -14,10 +13,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.SqlServer/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
similarity index 87%
rename from src/DotNetCore.CAP.SqlServer/IAdditionalProcessor.Default.cs
rename to src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
index 8e344d6..c7a4d03 100644
--- a/src/DotNetCore.CAP.SqlServer/IAdditionalProcessor.Default.cs
+++ b/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
@@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.SqlServer
{
- public class DefaultAdditionalProcessor : IAdditionalProcessor
+ public class SqlServerCollectProcessor : ICollectProcessor
{
private const int MaxBatch = 1000;
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
- public DefaultAdditionalProcessor(ILogger logger,
+ public SqlServerCollectProcessor(ILogger logger,
SqlServerOptions sqlServerOptions)
{
_logger = logger;
@@ -33,10 +33,10 @@ namespace DotNetCore.CAP.SqlServer
public async Task ProcessAsync(ProcessingContext context)
{
- _logger.LogDebug("Collecting expired entities.");
-
foreach (var table in Tables)
{
+ _logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}].");
+
int removedCount;
do
{
diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs
index 732143e..31c4bcd 100644
--- a/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs
+++ b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs
@@ -128,7 +128,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';
var sqlQuery =
$"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state";
- var count = connection.ExecuteScalar(sqlQuery, new {state = statusName});
+ var count = connection.ExecuteScalar(sqlQuery, new { state = statusName });
return count;
}
@@ -171,10 +171,10 @@ with aggr as (
)
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";
- var valuesMap = connection.Query(
+ var valuesMap = connection.Query(
sqlQuery,
- new {keys = keyMaps.Keys, statusName})
- .ToDictionary(x => (string) x.Key, x => (int) x.Count);
+ new { keys = keyMaps.Keys, statusName })
+ .ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
{
diff --git a/src/DotNetCore.CAP/Dashboard/TimelineCounter.cs b/src/DotNetCore.CAP/Dashboard/TimelineCounter.cs
new file mode 100644
index 0000000..c4c1a0b
--- /dev/null
+++ b/src/DotNetCore.CAP/Dashboard/TimelineCounter.cs
@@ -0,0 +1,8 @@
+namespace DotNetCore.CAP.Dashboard
+{
+ public class TimelineCounter
+ {
+ public string Key { get; set; }
+ public int Count { get; set; }
+ }
+}
diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
index 464a2a5..cc1a94d 100644
--- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj
+++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
@@ -1,32 +1,14 @@
-
-
+
+
netstandard2.0
- DotNetCore.CAP
- $(PackageTags);
+
bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml
1701;1702;1705;CS1591
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -47,19 +29,21 @@
+
-
-
-
-
+
+
+
+
-
+
-
-
+
+
+
True
@@ -143,9 +127,4 @@
Strings.Designer.cs
-
-
- RazorGenerator
-
-
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs
index d89986a..9af8f2e 100644
--- a/src/DotNetCore.CAP/IBootstrapper.Default.cs
+++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs
@@ -59,6 +59,8 @@ namespace DotNetCore.CAP
private async Task BootstrapTaskAsync()
{
+ _logger.LogInformation("### CAP starting...");
+
await Storage.InitializeAsync(_cts.Token);
if (_cts.IsCancellationRequested)
@@ -83,6 +85,8 @@ namespace DotNetCore.CAP
_ctsRegistration.Dispose();
_cts.Dispose();
+
+ _logger.LogInformation("### CAP started!");
}
protected virtual Task BootstrapCoreAsync()
diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
index 1d441ff..e2179ff 100644
--- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
@@ -146,22 +146,22 @@ namespace DotNetCore.CAP
switch (logmsg.LogType)
{
case MqLogType.ConsumerCancelled:
- _logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason);
+ _logger.LogWarning("RabbitMQ consumer cancelled. --> " + logmsg.Reason);
break;
case MqLogType.ConsumerRegistered:
- _logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason);
+ _logger.LogInformation("RabbitMQ consumer registered. --> " + logmsg.Reason);
break;
case MqLogType.ConsumerUnregistered:
- _logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason);
+ _logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason);
break;
case MqLogType.ConsumerShutdown:
- _logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason);
+ _logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason);
break;
case MqLogType.ConsumeError:
- _logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason);
+ _logger.LogError("Kakfa client consume error. --> " + logmsg.Reason);
break;
case MqLogType.ServerConnError:
- _logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason);
+ _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason);
break;
default:
throw new ArgumentOutOfRangeException();
diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
index 92f988e..fe03a15 100644
--- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
+++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
@@ -72,7 +72,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
- _logger.ExceptionOccuredWhileExecuting(message.Name, ex);
+ _logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");
await SetFailedState(message, ex, out bool stillRetry);
if (stillRetry)
diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
index 7bf92f3..f751499 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
+using System.Text.RegularExpressions;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
@@ -19,6 +20,8 @@ namespace DotNetCore.CAP.Internal
{
private readonly CapOptions _capOptions;
private readonly IServiceProvider _serviceProvider;
+ private List> _asteriskList;
+ private List> _poundList;
///
/// Creates a new .
@@ -29,17 +32,6 @@ namespace DotNetCore.CAP.Internal
_capOptions = capOptions;
}
- ///
- /// Selects the best candidate from for
- /// the
- /// current message associated.
- ///
- public ConsumerExecutorDescriptor SelectBestCandidate(string key,
- IReadOnlyList executeDescriptor)
- {
- return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
- }
-
public IReadOnlyList SelectCandidates()
{
var executorDescriptorList = new List();
@@ -51,6 +43,26 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
+ public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList executeDescriptor)
+ {
+ var result = MatchUsingName(key, executeDescriptor);
+ if (result != null)
+ {
+ return result;
+ }
+
+ //[*] match with regex, i.e. foo.*.abc
+ result = MatchAsteriskUsingRegex(key, executeDescriptor);
+ if (result != null)
+ {
+ return result;
+ }
+
+ //[#] match regex, i.e. foo.#
+ result = MatchPoundUsingRegex(key, executeDescriptor);
+ return result;
+ }
+
private IEnumerable FindConsumersFromInterfaceTypes(
IServiceProvider provider)
{
@@ -130,5 +142,65 @@ namespace DotNetCore.CAP.Internal
return descriptor;
}
+
+ private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList executeDescriptor)
+ {
+ return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
+ }
+
+ private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList executeDescriptor)
+ {
+ if (_asteriskList == null)
+ {
+ _asteriskList = executeDescriptor
+ .Where(x => x.Attribute.Name.IndexOf('*') >= 0)
+ .Select(x => new RegexExecuteDescriptor
+ {
+ Name = ("^" + x.Attribute.Name + "$").Replace("*", "[a-zA-Z]+").Replace(".", "\\."),
+ Descriptor = x
+ }).ToList();
+ }
+ foreach (var red in _asteriskList)
+ {
+ if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
+ {
+ return red.Descriptor;
+ }
+ }
+
+ return null;
+ }
+
+ private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList executeDescriptor)
+ {
+ if (_poundList == null)
+ {
+ _poundList = executeDescriptor
+ .Where(x => x.Attribute.Name.IndexOf('#') >= 0)
+ .Select(x => new RegexExecuteDescriptor
+ {
+ Name = ("^" + x.Attribute.Name + "$").Replace("#", "[a-zA-Z\\.]+"),
+ Descriptor = x
+ }).ToList();
+ }
+
+ foreach (var red in _poundList)
+ {
+ if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
+ {
+ return red.Descriptor;
+ }
+ }
+
+ return null;
+ }
+
+
+ private class RegexExecuteDescriptor
+ {
+ public string Name { get; set; }
+
+ public T Descriptor { get; set; }
+ }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs
index 3c1dc94..0b53af7 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs
@@ -22,8 +22,6 @@ namespace DotNetCore.CAP.Internal
///
/// topic or exchange router key.
/// the set of candidates.
- ///
- ConsumerExecutorDescriptor
- SelectBestCandidate(string key, IReadOnlyList candidates);
+ ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList candidates);
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
index d6b40ba..8ff25c7 100644
--- a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
+++ b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
@@ -11,21 +11,20 @@ namespace DotNetCore.CAP.Internal
internal class MethodMatcherCache
{
private readonly IConsumerServiceSelector _selector;
- private List _allTopics;
public MethodMatcherCache(IConsumerServiceSelector selector)
{
_selector = selector;
- Entries = new ConcurrentDictionary>();
+ Entries = new ConcurrentDictionary>();
}
- private ConcurrentDictionary> Entries { get; }
+ private ConcurrentDictionary> Entries { get; }
///
/// Get a dictionary of candidates.In the dictionary,
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates
///
- public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped()
+ public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped()
{
if (Entries.Count != 0)
{
@@ -44,28 +43,6 @@ namespace DotNetCore.CAP.Internal
return Entries;
}
- ///
- /// Get a dictionary of specify topic candidates.
- /// The Key is Group name, the value is specify topic candidates.
- ///
- /// message topic name
- public IDictionary> GetTopicExector(string topicName)
- {
- if (Entries == null)
- {
- throw new ArgumentNullException(nameof(Entries));
- }
-
- var dic = new Dictionary>();
- foreach (var item in Entries)
- {
- var topicCandidates = item.Value.Where(x => x.Attribute.Name == topicName);
- dic.Add(item.Key, topicCandidates.ToList());
- }
-
- return dic;
- }
-
///
/// Attempts to get the topic exector associated with the specified topic name and group name from the
/// .
@@ -86,36 +63,12 @@ namespace DotNetCore.CAP.Internal
if (Entries.TryGetValue(groupName, out var groupMatchTopics))
{
- matchTopic = groupMatchTopics.FirstOrDefault(x => x.Attribute.Name == topicName);
+ matchTopic = _selector.SelectBestCandidate(topicName, groupMatchTopics);
+
return matchTopic != null;
}
return false;
}
-
- ///
- /// Get all subscribe topics name.
- ///
- public IEnumerable GetSubscribeTopics()
- {
- if (_allTopics != null)
- {
- return _allTopics;
- }
-
- if (Entries == null)
- {
- throw new ArgumentNullException(nameof(Entries));
- }
-
- _allTopics = new List();
-
- foreach (var descriptors in Entries.Values)
- {
- _allTopics.AddRange(descriptors.Select(x => x.Attribute.Name));
- }
-
- return _allTopics;
- }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs b/src/DotNetCore.CAP/Processor/ICollectProcessor.cs
similarity index 78%
rename from src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs
rename to src/DotNetCore.CAP/Processor/ICollectProcessor.cs
index 51fc1c3..9fd2625 100644
--- a/src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs
+++ b/src/DotNetCore.CAP/Processor/ICollectProcessor.cs
@@ -3,7 +3,7 @@
namespace DotNetCore.CAP.Processor
{
- public interface IAdditionalProcessor : IProcessor
+ public interface ICollectProcessor : IProcessor
{
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
index 95e9cdb..433c01b 100644
--- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
+++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
@@ -65,7 +65,7 @@ namespace DotNetCore.CAP.Processor
}
catch (Exception ex)
{
- _logger.ExceptionOccuredWhileExecuting(message.Name, ex);
+ _logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Topic:{message.Name}, Id:{message.Id}");
}
}
}
@@ -82,14 +82,7 @@ namespace DotNetCore.CAP.Processor
{
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token))
{
- try
- {
- _executor.ExecuteAsync(message);
- }
- catch (Exception ex)
- {
- _logger.ExceptionOccuredWhileExecuting(message.Name, ex);
- }
+ _executor.ExecuteAsync(message);
}
}
catch (OperationCanceledException)
diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
index b9d8802..a05c302 100644
--- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
+++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
@@ -57,13 +57,14 @@ namespace DotNetCore.CAP.Processor
return;
}
- _disposed = true;
-
- _logger.ServerShuttingDown();
- _cts.Cancel();
try
{
- _compositeTask.Wait((int) TimeSpan.FromSeconds(10).TotalMilliseconds);
+ _disposed = true;
+
+ _logger.ServerShuttingDown();
+ _cts.Cancel();
+
+ _compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
}
catch (AggregateException ex)
{
@@ -73,6 +74,14 @@ namespace DotNetCore.CAP.Processor
_logger.ExpectedOperationCanceledException(innerEx);
}
}
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "An exception was occured when disposing.");
+ }
+ finally
+ {
+ _logger.LogInformation("### CAP shutdown!");
+ }
}
private IProcessor InfiniteRetry(IProcessor inner)
@@ -85,7 +94,7 @@ namespace DotNetCore.CAP.Processor
var returnedProcessors = new List
{
_provider.GetRequiredService(),
- _provider.GetRequiredService()
+ _provider.GetRequiredService()
};
return returnedProcessors.ToArray();
diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
index 31710c4..8678e45 100644
--- a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
+++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
@@ -1,11 +1,8 @@
- netcoreapp2.0
- true
- DotNetCore.CAP.MySql.Test
- DotNetCore.CAP.MySql.Test
- true
+ netcoreapp2.1
+ false
@@ -14,20 +11,20 @@
-
-
-
+
+
+
-
-
+
+
-
-
-
-
-
-
+
+
+
+
+
+
\ No newline at end of file
diff --git a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
index 2346ca5..335f851 100644
--- a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
+++ b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
@@ -1,15 +1,14 @@
-
+
- netcoreapp2.0
-
+ netcoreapp2.1
false
-
-
-
+
+
+
diff --git a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
index d8e70d4..9375d31 100644
--- a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
+++ b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.0
+ netcoreapp2.1
false
@@ -11,20 +11,20 @@
-
-
-
+
+
+
-
-
+
+
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
index 71154b5..8f255a4 100644
--- a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
+++ b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
@@ -28,7 +28,7 @@ namespace DotNetCore.CAP.Test
var selector = _provider.GetRequiredService();
var candidates = selector.SelectCandidates();
- Assert.Equal(2, candidates.Count);
+ Assert.Equal(6, candidates.Count);
}
[Fact]
@@ -42,6 +42,66 @@ namespace DotNetCore.CAP.Test
Assert.NotNull(bestCandidates.MethodInfo);
Assert.Equal(typeof(Task), bestCandidates.MethodInfo.ReturnType);
}
+
+
+ [Theory]
+ [InlineData("Candidates.Asterisk")]
+ [InlineData("candidates.Asterisk")]
+ [InlineData("AAA.BBB.Asterisk")]
+ [InlineData("aaa.bbb.Asterisk")]
+ public void CanFindAsteriskTopic(string topic)
+ {
+ var selector = _provider.GetRequiredService();
+ var candidates = selector.SelectCandidates();
+
+ var bestCandidates = selector.SelectBestCandidate(topic, candidates);
+ Assert.NotNull(bestCandidates);
+ }
+
+ [Theory]
+ [InlineData("Candidates.Asterisk.AAA")]
+ [InlineData("AAA.BBB.CCC.Asterisk")]
+ [InlineData("aaa.BBB.ccc.Asterisk")]
+ [InlineData("Asterisk.aaa.bbb")]
+ public void CanNotFindAsteriskTopic(string topic)
+ {
+ var selector = _provider.GetRequiredService();
+ var candidates = selector.SelectCandidates();
+
+ var bestCandidates = selector.SelectBestCandidate(topic, candidates);
+ Assert.Null(bestCandidates);
+ }
+
+ [Theory]
+ [InlineData("Candidates.Pound.AAA")]
+ [InlineData("Candidates.Pound.AAA.BBB")]
+ [InlineData("AAA.Pound")]
+ [InlineData("aaa.Pound")]
+ [InlineData("aaa.bbb.Pound")]
+ [InlineData("aaa.BBB.Pound")]
+ public void CanFindPoundTopic(string topic)
+ {
+ var selector = _provider.GetRequiredService();
+ var candidates = selector.SelectCandidates();
+
+ var bestCandidates = selector.SelectBestCandidate(topic, candidates);
+ Assert.NotNull(bestCandidates);
+ }
+
+ [Theory]
+ [InlineData("Pound")]
+ [InlineData("aaa.Pound.AAA.BBB")]
+ [InlineData("Pound.AAA")]
+ [InlineData("Pound.aaa")]
+ [InlineData("AAA.Pound.aaa")]
+ public void CanNotFindPoundTopic(string topic)
+ {
+ var selector = _provider.GetRequiredService();
+ var candidates = selector.SelectCandidates();
+
+ var bestCandidates = selector.SelectBestCandidate(topic, candidates);
+ Assert.Null(bestCandidates);
+ }
}
public class CandidatesTopic : TopicAttribute
@@ -73,6 +133,21 @@ namespace DotNetCore.CAP.Test
{
Console.WriteLine("GetFoo2() method has bee excuted.");
}
+
+ [CandidatesTopic("*.*.Asterisk")]
+ [CandidatesTopic("*.Asterisk")]
+ public void GetFooAsterisk()
+ {
+ Console.WriteLine("GetFoo2Asterisk() method has bee excuted.");
+ }
+
+ [CandidatesTopic("Candidates.Pound.#")]
+ [CandidatesTopic("#.Pound")]
+ public void GetFooPound()
+ {
+ Console.WriteLine("GetFoo2Pound() method has bee excuted.");
+ }
+
}
public class CandidatesBarTest : IBarTest
diff --git a/test/DotNetCore.CAP.Test/DiagnosticsTest.cs b/test/DotNetCore.CAP.Test/DiagnosticsTest.cs
new file mode 100644
index 0000000..b3473a9
--- /dev/null
+++ b/test/DotNetCore.CAP.Test/DiagnosticsTest.cs
@@ -0,0 +1,241 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using DotNetCore.CAP.Diagnostics;
+using DotNetCore.CAP.Internal;
+using Xunit;
+
+namespace DotNetCore.CAP.Test
+{
+
+ public class DiagnosticsTest
+ {
+ private static readonly DiagnosticListener s_diagnosticListener =
+ new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);
+
+ [Fact]
+ public void WritePublishBeforeTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerPublishEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow);
+ s_diagnosticListener.WritePublishBefore(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforePublish))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerPublishEventData)kvp.Value).OperationId);
+ }
+ });
+ }
+
+ [Fact]
+ public void WritePublishAfterTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerPublishEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1));
+ s_diagnosticListener.WritePublishAfter(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterPublish))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerPublishEndEventData)kvp.Value).OperationId);
+ Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerPublishEndEventData)kvp.Value).Duration);
+ }
+ });
+ }
+
+ [Fact]
+ public void WritePublishErrorTest()
+ {
+ Guid operationId = Guid.NewGuid();
+ var ex = new Exception("WritePublishErrorTest");
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerPublishErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan));
+ s_diagnosticListener.WritePublishError(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerPublishErrorEventData)kvp.Value).OperationId);
+ Assert.Equal(ex, ((BrokerPublishErrorEventData)kvp.Value).Exception);
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteConsumeBeforeTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerConsumeEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow);
+ s_diagnosticListener.WriteConsumeBefore(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeConsume))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerConsumeEventData)kvp.Value).OperationId);
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteConsumeAfterTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerConsumeEndEventData(operationId, "", "", "", "", DateTimeOffset.UtcNow, TimeSpan.FromMinutes(1));
+ s_diagnosticListener.WriteConsumeAfter(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterConsume))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerConsumeEndEventData)kvp.Value).OperationId);
+ Assert.Equal(TimeSpan.FromMinutes(1), ((BrokerConsumeEndEventData)kvp.Value).Duration);
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteConsumeErrorTest()
+ {
+ Guid operationId = Guid.NewGuid();
+ var ex = new Exception("WriteConsumeErrorTest");
+ DiagnosticsWapper(() =>
+ {
+ var eventData = new BrokerConsumeErrorEventData(operationId, "", "", "", "", ex, DateTimeOffset.UtcNow, default(TimeSpan));
+ s_diagnosticListener.WriteConsumeError(eventData);
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorPublish))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((BrokerConsumeErrorEventData)kvp.Value).OperationId);
+ Assert.Equal(ex, ((BrokerConsumeErrorEventData)kvp.Value).Exception);
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteSubscriberInvokeBeforeTest()
+ {
+ DiagnosticsWapper(() =>
+ {
+ s_diagnosticListener.WriteSubscriberInvokeBefore(FackConsumerContext());
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapBeforeSubscriberInvoke))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteSubscriberInvokeAfterTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ DiagnosticsWapper(() =>
+ {
+ s_diagnosticListener.WriteSubscriberInvokeAfter(operationId, FackConsumerContext(), DateTimeOffset.Now, TimeSpan.FromMinutes(1));
+
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapAfterSubscriberInvoke))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((SubscriberInvokeEndEventData)kvp.Value).OperationId);
+
+ }
+ });
+ }
+
+ [Fact]
+ public void WriteSubscriberInvokeErrorTest()
+ {
+ Guid operationId = Guid.NewGuid();
+
+ var ex = new Exception("WriteConsumeErrorTest");
+ DiagnosticsWapper(() =>
+ {
+ s_diagnosticListener.WriteSubscriberInvokeError(operationId, FackConsumerContext(), ex,
+ DateTimeOffset.Now, TimeSpan.MaxValue);
+ }, kvp =>
+ {
+ if (kvp.Key.Equals(CapDiagnosticListenerExtensions.CapErrorSubscriberInvoke))
+ {
+ Assert.NotNull(kvp.Value);
+ Assert.IsType(kvp.Value);
+ Assert.Equal(operationId, ((SubscriberInvokeErrorEventData)kvp.Value).OperationId);
+ Assert.Equal(ex, ((SubscriberInvokeErrorEventData)kvp.Value).Exception);
+ }
+ });
+ }
+
+ private ConsumerContext FackConsumerContext()
+ {
+ //Mock description
+ var description = new ConsumerExecutorDescriptor
+ {
+ MethodInfo = GetType().GetMethod("WriteSubscriberInvokeAfterTest"),
+ Attribute = new CapSubscribeAttribute("xxx"),
+ ImplTypeInfo = GetType().GetTypeInfo()
+ };
+
+ //Mock messageContext
+ var messageContext = new MessageContext
+ {
+ Name= "Name",
+ Group= "Group",
+ Content = "Content"
+ };
+
+ return new ConsumerContext(description, messageContext);
+ }
+
+ private void DiagnosticsWapper(Action operation, Action> assert, [CallerMemberName]string methodName = "")
+ {
+ FakeDiagnosticListenerObserver diagnosticListenerObserver = new FakeDiagnosticListenerObserver(assert);
+
+ diagnosticListenerObserver.Enable();
+ using (DiagnosticListener.AllListeners.Subscribe(diagnosticListenerObserver))
+ {
+ Console.WriteLine(string.Format("Test: {0} Enabled Listeners", methodName));
+ operation();
+ }
+ }
+ }
+}
diff --git a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
index 09e9dd9..7216569 100644
--- a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
+++ b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
@@ -1,21 +1,19 @@
- netcoreapp2.0
- true
- DotNetCore.CAP.Test
- true
+ netcoreapp2.1
+ false
-
+
-
-
+
+
-
+
diff --git a/test/DotNetCore.CAP.Test/FakeDiagnosticListenerObserver.cs b/test/DotNetCore.CAP.Test/FakeDiagnosticListenerObserver.cs
new file mode 100644
index 0000000..81db2c8
--- /dev/null
+++ b/test/DotNetCore.CAP.Test/FakeDiagnosticListenerObserver.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Text;
+using DotNetCore.CAP.Diagnostics;
+
+namespace DotNetCore.CAP.Test
+{
+ public sealed class FakeDiagnosticListenerObserver : IObserver
+ {
+ private class FakeDiagnosticSourceWriteObserver : IObserver>
+ {
+ private readonly Action> _writeCallback;
+
+ public FakeDiagnosticSourceWriteObserver(Action> writeCallback)
+ {
+ _writeCallback = writeCallback;
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(KeyValuePair value)
+ {
+ _writeCallback(value);
+ }
+ }
+
+ private readonly Action> _writeCallback;
+ private bool _writeObserverEnabled;
+
+ public FakeDiagnosticListenerObserver(Action> writeCallback)
+ {
+ _writeCallback = writeCallback;
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ }
+
+ public void OnNext(DiagnosticListener value)
+ {
+ if (value.Name.Equals(CapDiagnosticListenerExtensions.DiagnosticListenerName))
+ {
+ value.Subscribe(new FakeDiagnosticSourceWriteObserver(_writeCallback), IsEnabled);
+ }
+ }
+
+ public void Enable()
+ {
+ _writeObserverEnabled = true;
+ }
+ public void Disable()
+ {
+ _writeObserverEnabled = false;
+ }
+ private bool IsEnabled(string s)
+ {
+ return _writeObserverEnabled;
+ }
+ }
+}
diff --git a/test/DotNetCore.CAP.Test/HelperTest.cs b/test/DotNetCore.CAP.Test/HelperTest.cs
new file mode 100644
index 0000000..48ee66f
--- /dev/null
+++ b/test/DotNetCore.CAP.Test/HelperTest.cs
@@ -0,0 +1,154 @@
+using System;
+using System.Reflection;
+using DotNetCore.CAP.Diagnostics;
+using DotNetCore.CAP.Infrastructure;
+using Newtonsoft.Json.Linq;
+using Xunit;
+
+namespace DotNetCore.CAP.Test
+{
+ public class HelperTest
+ {
+
+ [Fact]
+ public void ToTimestampTest()
+ {
+ //Arrange
+ var time = DateTime.Parse("2018-01-01 00:00:00");
+
+ //Act
+ var result = Helper.ToTimestamp(time);
+
+ //Assert
+ Assert.Equal(1514764800, result);
+ }
+
+ [Fact]
+ public void FromTimestampTest()
+ {
+ //Arrange
+ var time = DateTime.Parse("2018-01-01 00:00:00");
+
+ //Act
+ var result = Helper.FromTimestamp(1514764800);
+
+ //Assert
+ Assert.Equal(time, result);
+ }
+
+ [Fact]
+ public void IsControllerTest()
+ {
+ //Arrange
+ var typeInfo = typeof(HomeController).GetTypeInfo();
+
+ //Act
+ var result = Helper.IsController(typeInfo);
+
+ //Assert
+ Assert.True(result);
+ }
+
+ [Theory]
+ [InlineData(typeof(string))]
+ [InlineData(typeof(decimal))]
+ [InlineData(typeof(DateTime))]
+ [InlineData(typeof(DateTimeOffset))]
+ [InlineData(typeof(Guid))]
+ [InlineData(typeof(TimeSpan))]
+ [InlineData(typeof(Uri))]
+ public void IsSimpleTypeTest(Type type)
+ {
+ //Act
+ var result = Helper.IsComplexType(type);
+
+ //Assert
+ Assert.False(result);
+ }
+
+ [Theory]
+ [InlineData(typeof(HomeController))]
+ [InlineData(typeof(Exception))]
+ [InlineData(typeof(Person))]
+ public void IsComplexTypeTest(Type type)
+ {
+ //Act
+ var result = Helper.IsComplexType(type);
+
+ //Assert
+ Assert.True(result);
+ }
+
+ [Fact]
+ public void AddExceptionPropertyTest()
+ {
+ //Arrange
+ var json = "{}";
+ var exception = new Exception("Test Exception Message")
+ {
+ Source = "Test Source",
+ InnerException = { }
+ };
+
+ var expected = new
+ {
+ ExceptionMessage = new
+ {
+ Source = "Test Source",
+ Message = "Test Exception Message",
+ InnerMessage = new { }
+ }
+ };
+
+ //Act
+ var result = Helper.AddExceptionProperty(json, exception);
+
+ //Assert
+ var jObj = JObject.Parse(result);
+ Assert.Equal(jObj["ExceptionMessage"]["Source"].Value(), expected.ExceptionMessage.Source);
+ Assert.Equal(jObj["ExceptionMessage"]["Message"].Value(), expected.ExceptionMessage.Message);
+ }
+
+ [Theory]
+ [InlineData("10.0.0.1")]
+ [InlineData("172.16.0.1")]
+ [InlineData("192.168.1.1")]
+ public void IsInnerIPTest(string ipAddress)
+ {
+ Assert.True(Helper.IsInnerIP(ipAddress));
+ }
+
+ [Fact]
+ public void AddTracingHeaderPropertyTest()
+ {
+ //Arrange
+ var json = "{}";
+ var header = new TracingHeaders { { "key", "value" } };
+
+ //Act
+ var result = Helper.AddTracingHeaderProperty(json, header);
+
+ //Assert
+ var expected = "{\"TracingHeaders\":{\"key\":\"value\"}}";
+ Assert.Equal(expected, result);
+ }
+
+ [Fact]
+ public void TryExtractTracingHeadersTest()
+ {
+ //Arrange
+ var json = "{\"TracingHeaders\":{\"key\":\"value\"}}";
+ TracingHeaders header = null;
+ string removedHeadersJson = "";
+
+ //Act
+ var result = Helper.TryExtractTracingHeaders(json, out header, out removedHeadersJson);
+
+ //Assert
+ Assert.True(result);
+ Assert.NotNull(header);
+ Assert.Single(header);
+ Assert.Equal("{}", removedHeadersJson);
+ }
+ }
+}
diff --git a/test/DotNetCore.CAP.Test/Processor/DefaultDispatcherTest.cs b/test/DotNetCore.CAP.Test/Processor/DefaultDispatcherTest.cs
deleted file mode 100644
index 1f3c77e..0000000
--- a/test/DotNetCore.CAP.Test/Processor/DefaultDispatcherTest.cs
+++ /dev/null
@@ -1,83 +0,0 @@
-//using System;
-//using System.Threading;
-//using System.Threading.Tasks;
-//using DotNetCore.CAP.Models;
-//using DotNetCore.CAP.Processor;
-//using Microsoft.Extensions.DependencyInjection;
-//using Microsoft.Extensions.Options;
-//using Moq;
-//using Xunit;
-
-//namespace DotNetCore.CAP.Test
-//{
-// public class DefaultDispatcherTest
-// {
-// private CancellationTokenSource _cancellationTokenSource;
-// private ProcessingContext _context;
-// private IServiceProvider _provider;
-// private Mock _mockStorageConnection;
-
-// public DefaultDispatcherTest()
-// {
-// _mockStorageConnection = new Mock();
-
-// _cancellationTokenSource = new CancellationTokenSource();
-
-// var services = new ServiceCollection();
-// services.AddLogging();
-// services.Configure>(x => { });
-// services.AddOptions();
-// services.AddSingleton(_mockStorageConnection.Object);
-// _provider = services.BuildServiceProvider();
-
-// _context = new ProcessingContext(_provider, _cancellationTokenSource.Token);
-// }
-
-// [Fact]
-// public void MockTest()
-// {
-// Assert.NotNull(_provider.GetServices());
-// }
-
-// [Fact]
-// public async void ProcessAsync_CancellationTokenCancelled_ThrowsImmediately()
-// {
-// // Arrange
-// _cancellationTokenSource.Cancel();
-// var fixture = Create();
-
-// // Act
-// await Assert.ThrowsAsync(() => fixture.ProcessAsync(_context));
-// }
-
-// [Fact]
-// public async Task ProcessAsync()
-// {
-// // Arrange
-// var job = new CapPublishedMessage
-// {
-// };
-
-// var mockFetchedJob = Mock.Get(Mock.Of(fj => fj.MessageId == 42 && fj.MessageType == MessageType.Publish));
-
-// _mockStorageConnection
-// .Setup(m => m.FetchNextMessageAsync())
-// .ReturnsAsync(mockFetchedJob.Object).Verifiable();
-
-// _mockQueueExecutor
-// .Setup(x => x.ExecuteAsync(_mockStorageConnection.Object, mockFetchedJob.Object))
-// .Returns(Task.FromResult(OperateResult.Success));
-
-// var fixture = Create();
-
-// // Act
-// await fixture.ProcessAsync(_context);
-
-// // Assert
-// _mockStorageConnection.VerifyAll();
-// }
-
-// private DefaultDispatcher Create()
-// => _provider.GetService();
-// }
-//}
\ No newline at end of file