diff --git a/.gitignore b/.gitignore index 9960815..96cda33 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ bin/ /.idea/.idea.CAP /.idea/.idea.CAP /.idea +Properties \ No newline at end of file diff --git a/CAP.sln b/CAP.sln index 35a9e3d..d211d0c 100644 --- a/CAP.sln +++ b/CAP.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26430.15 +VisualStudioVersion = 15.0.26430.16 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}" EndProject @@ -35,8 +35,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNe EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka", "samples\Sample.Kafka\Sample.Kafka.csproj", "{2F095ED9-5BC9-4512-9013-A47685FB2508}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Kafka", "src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj", "{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RabbitMQ", "src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj", "{9961B80E-0718-4280-B2A0-271B003DE26B}" @@ -59,6 +57,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer.Test", "test\DotNetCore.CAP.SqlServer.Test\DotNetCore.CAP.SqlServer.Test.csproj", "{DA00FA38-C4B9-4F55-8756-D480FBC1084F}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql", "src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj", "{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test", "test\DotNetCore.CAP.MySql.Test\DotNetCore.CAP.MySql.Test.csproj", "{80A84F62-1558-427B-BA74-B47AA8A665B5}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,10 +75,6 @@ Global {E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.Build.0 = Debug|Any CPU {E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.ActiveCfg = Release|Any CPU {E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.Build.0 = Release|Any CPU - {2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.Build.0 = Debug|Any CPU - {2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.ActiveCfg = Release|Any CPU - {2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.Build.0 = Release|Any CPU {C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Debug|Any CPU.Build.0 = Debug|Any CPU {C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -92,6 +94,22 @@ Global {DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Debug|Any CPU.Build.0 = Debug|Any CPU {DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.ActiveCfg = Release|Any CPU {DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.Build.0 = Release|Any CPU + {FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Release|Any CPU.Build.0 = Release|Any CPU + {80A84F62-1558-427B-BA74-B47AA8A665B5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {80A84F62-1558-427B-BA74-B47AA8A665B5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.Build.0 = Release|Any CPU + {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU + {AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,11 +117,14 @@ Global GlobalSection(NestedProjects) = preSolution {9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} - {2F095ED9-5BC9-4512-9013-A47685FB2508} = {3A6B6931-A123-477A-9469-8B468B5385AF} {C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {3B577468-6792-4EF1-9237-15180B176A24} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {DA00FA38-C4B9-4F55-8756-D480FBC1084F} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} + {FA15685A-778A-4D2A-A2FE-27FAD2FFA65B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} + {9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection EndGlobal diff --git a/appveyor.yml b/appveyor.yml index 3d78602..2da2824 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -4,8 +4,10 @@ environment: BUILDING_ON_PLATFORM: win BuildEnvironment: appveyor Cap_SqlServer_ConnectionStringTemplate: Server=(local)\SQL2014;Database={0};User ID=sa;Password=Password12! + Cap_MySql_ConnectionStringTemplate: Server=localhost;Database={0};Uid=root;Pwd=Password12! services: - mssql2014 + - mysql build_script: - ps: ./ConfigureMSDTC.ps1 - ps: ./build.ps1 diff --git a/build/common.props b/build/common.props index c82b701..aa53f57 100644 --- a/build/common.props +++ b/build/common.props @@ -14,8 +14,8 @@ https://avatars2.githubusercontent.com/u/19404084 https://github.com/dotnetcore/CAP https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt - aspnetcore;cap;consistency - Eventually consistency in distributed architectures. + eventbus;rabbitmq;kafka;cap;transaction; + EventBus and eventually consistency in distributed architectures. diff --git a/build/version.props b/build/version.props index a3d04c3..7a220b4 100644 --- a/build/version.props +++ b/build/version.props @@ -1,8 +1,8 @@ 1 - 0 - 1 + 1 + 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/samples/Sample.Kafka.SqlServer/AppDbContext.cs b/samples/Sample.Kafka.SqlServer/AppDbContext.cs new file mode 100644 index 0000000..53cecb7 --- /dev/null +++ b/samples/Sample.Kafka.SqlServer/AppDbContext.cs @@ -0,0 +1,13 @@ +using Microsoft.EntityFrameworkCore; + +namespace Sample.Kafka +{ + public class AppDbContext : DbContext + { + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + //optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); + optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); + } + } +} diff --git a/samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs b/samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs new file mode 100644 index 0000000..61b511a --- /dev/null +++ b/samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs @@ -0,0 +1,47 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using DotNetCore.CAP; +using Microsoft.AspNetCore.Mvc; + +namespace Sample.Kafka.Controllers +{ + [Route("api/[controller]")] + public class ValuesController : Controller, ICapSubscribe + { + private readonly ICapPublisher _capBus; + private readonly AppDbContext _dbContext; + + public ValuesController(ICapPublisher producer, AppDbContext dbContext) + { + _capBus = producer; + _dbContext = dbContext; + } + + [Route("~/publish")] + public IActionResult PublishMessage() + { + _capBus.Publish("sample.rabbitmq.mysql", ""); + return Ok(); + } + + [Route("~/publishWithTrans")] + public async Task PublishMessageWithTransaction() + { + using (var trans = await _dbContext.Database.BeginTransactionAsync()) + { + await _capBus.PublishAsync("sample.rabbitmq.mysql", ""); + trans.Commit(); + } + return Ok(); + } + + [NonAction] + [CapSubscribe("sample.kafka.sqlserver", Group = "test")] + public void KafkaTest() + { + Console.WriteLine("[sample.kafka.sqlserver] message received"); + Debug.WriteLine("[sample.kafka.sqlserver] message received"); + } + } +} \ No newline at end of file diff --git a/samples/Sample.Kafka/Program.cs b/samples/Sample.Kafka.SqlServer/Program.cs similarity index 57% rename from samples/Sample.Kafka/Program.cs rename to samples/Sample.Kafka.SqlServer/Program.cs index 659106f..37d3089 100644 --- a/samples/Sample.Kafka/Program.cs +++ b/samples/Sample.Kafka.SqlServer/Program.cs @@ -1,13 +1,21 @@ using System.IO; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; namespace Sample.Kafka { public class Program { - public static void Main(string[] args) { + public static void Main(string[] args) + { + var config = new ConfigurationBuilder() + .AddCommandLine(args) + .AddEnvironmentVariables("ASPNETCORE_") + .Build(); + var host = new WebHostBuilder() + .UseConfiguration(config) .UseKestrel() .UseContentRoot(Directory.GetCurrentDirectory()) .UseIISIntegration() diff --git a/samples/Sample.Kafka/Sample.Kafka.csproj b/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj similarity index 87% rename from samples/Sample.Kafka/Sample.Kafka.csproj rename to samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj index 5c9e38f..675a95b 100644 --- a/samples/Sample.Kafka/Sample.Kafka.csproj +++ b/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj @@ -2,14 +2,9 @@ netcoreapp1.1 + Sample.Kafka.SqlServer - - - - - - - + @@ -17,6 +12,7 @@ + diff --git a/samples/Sample.Kafka.SqlServer/Startup.cs b/samples/Sample.Kafka.SqlServer/Startup.cs new file mode 100644 index 0000000..08291c5 --- /dev/null +++ b/samples/Sample.Kafka.SqlServer/Startup.cs @@ -0,0 +1,33 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Sample.Kafka +{ + public class Startup + { + public void ConfigureServices(IServiceCollection services) + { + services.AddDbContext(); + + services.AddCap(x => + { + x.UseEntityFramework(); + x.UseKafka("localhost:9092"); + }); + + services.AddMvc(); + } + + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + { + loggerFactory.AddConsole(); + loggerFactory.AddDebug(); + + app.UseMvc(); + + app.UseCap(); + } + } +} \ No newline at end of file diff --git a/samples/Sample.Kafka/AppDbContext.cs b/samples/Sample.Kafka/AppDbContext.cs deleted file mode 100644 index fdfd723..0000000 --- a/samples/Sample.Kafka/AppDbContext.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Microsoft.EntityFrameworkCore; - -namespace Sample.Kafka -{ - public class AppDbContext : DbContext - { - protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) - { - optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); - //optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); - } - } -} diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs deleted file mode 100644 index 6783b19..0000000 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; -using System.Threading.Tasks; -using DotNetCore.CAP; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Mvc; - -namespace Sample.Kafka.Controllers -{ - [Route("api/[controller]")] - public class ValuesController : Controller, ICapSubscribe - { - private readonly ICapPublisher _producer; - private readonly AppDbContext _dbContext ; - - public ValuesController(ICapPublisher producer, AppDbContext dbContext) - { - _producer = producer; - _dbContext = dbContext; - } - - [Route("/")] - public IActionResult Index() - { - return Ok(); - } - public string ServerPath => ((IHostingEnvironment)HttpContext.RequestServices.GetService(typeof(IHostingEnvironment))).ContentRootPath; - - [CapSubscribe("zzwl.topic.finace.callBack", Group = "test")] - public void KafkaTest(Person person) - { - Console.WriteLine(DateTime.Now); - } - - [Route("~/send")] - public async Task SendTopic() - { - using (var trans = _dbContext.Database.BeginTransaction()) - { - await _producer.PublishAsync("zzwl.topic.finace.callBack",""); - - trans.Commit(); - } - - return Ok(); - } - - public class Person - { - public string Name { get; set; } - - public int Age { get; set; } - } - } -} \ No newline at end of file diff --git a/samples/Sample.Kafka/Properties/launchSettings.json b/samples/Sample.Kafka/Properties/launchSettings.json deleted file mode 100644 index a057a71..0000000 --- a/samples/Sample.Kafka/Properties/launchSettings.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:49909/", - "sslPort": 0 - } - }, - "profiles": { - "IIS Express": { - "commandName": "IISExpress", - "launchBrowser": true, - "launchUrl": "api/values", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, - "Sample.Kafka": { - "commandName": "Project", - "launchBrowser": true, - "launchUrl": "api/values", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - }, - "applicationUrl": "http://localhost:49910" - } - } -} diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs deleted file mode 100644 index add80bb..0000000 --- a/samples/Sample.Kafka/Startup.cs +++ /dev/null @@ -1,50 +0,0 @@ -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace Sample.Kafka -{ - public class Startup - { - public Startup(IHostingEnvironment env) - { - var builder = new ConfigurationBuilder() - .SetBasePath(env.ContentRootPath) - .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) - .AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true) - .AddEnvironmentVariables(); - Configuration = builder.Build(); - } - - public IConfigurationRoot Configuration { get; } - - // This method gets called by the runtime. Use this method to add services to the container. - public void ConfigureServices(IServiceCollection services) - { - services.AddDbContext(); - - services.AddCap(x => - { - x.UseEntityFramework(); - //x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); - x.UseKafka("localhost:9092"); - }); - - // Add framework services. - services.AddMvc(); - } - - // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. - public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) - { - loggerFactory.AddConsole(Configuration.GetSection("Logging")); - loggerFactory.AddDebug(); - - app.UseMvc(); - - app.UseCap(); - } - } -} \ No newline at end of file diff --git a/samples/Sample.Kafka/appsettings.Development.json b/samples/Sample.Kafka/appsettings.Development.json deleted file mode 100644 index fa8ce71..0000000 --- a/samples/Sample.Kafka/appsettings.Development.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "Logging": { - "IncludeScopes": false, - "LogLevel": { - "Default": "Debug", - "System": "Information", - "Microsoft": "Information" - } - } -} diff --git a/samples/Sample.Kafka/appsettings.json b/samples/Sample.Kafka/appsettings.json deleted file mode 100644 index 5fff67b..0000000 --- a/samples/Sample.Kafka/appsettings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "Logging": { - "IncludeScopes": false, - "LogLevel": { - "Default": "Warning" - } - } -} diff --git a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs new file mode 100644 index 0000000..5a60da7 --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; + +namespace Sample.RabbitMQ.MySql +{ + public class AppDbContext : DbContext + { + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;Uid=root;Pwd=123123;"); + } + } +} diff --git a/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs new file mode 100644 index 0000000..eec782e --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using DotNetCore.CAP; +using Microsoft.AspNetCore.Mvc; + +namespace Sample.RabbitMQ.MySql.Controllers +{ + [Route("api/[controller]")] + public class ValuesController : Controller + { + private readonly AppDbContext _dbContext; + private readonly ICapPublisher _capBus; + + public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher) + { + _dbContext = dbContext; + _capBus = capPublisher; + } + + [Route("~/publish")] + public IActionResult PublishMessage() + { + _capBus.Publish("sample.kafka.sqlserver", ""); + + return Ok(); + } + + [Route("~/publishWithTrans")] + public async Task PublishMessageWithTransaction() + { + using (var trans = await _dbContext.Database.BeginTransactionAsync()) + { + await _capBus.PublishAsync("sample.kafka.sqlserver", ""); + trans.Commit(); + } + return Ok(); + } + + [NonAction] + [CapSubscribe("sample.rabbitmq.mysql")] + public void ReceiveMessage() + { + Console.WriteLine("[sample.rabbitmq.mysql] message received"); + Debug.WriteLine("[sample.rabbitmq.mysql] message received"); + } + } +} diff --git a/samples/Sample.RabbitMQ.MySql/Program.cs b/samples/Sample.RabbitMQ.MySql/Program.cs new file mode 100644 index 0000000..0f7c032 --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/Program.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; + +namespace Sample.RabbitMQ.MySql +{ + public class Program + { + public static void Main(string[] args) + { + var config = new ConfigurationBuilder() + .AddCommandLine(args) + .AddEnvironmentVariables("ASPNETCORE_") + .Build(); + + var host = new WebHostBuilder() + .UseConfiguration(config) + .UseKestrel() + .UseContentRoot(Directory.GetCurrentDirectory()) + .UseIISIntegration() + .UseStartup() + .Build(); + + host.Run(); + } + } +} diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj new file mode 100644 index 0000000..e90171f --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj @@ -0,0 +1,28 @@ + + + + netcoreapp1.1 + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs new file mode 100644 index 0000000..5a3d92f --- /dev/null +++ b/samples/Sample.RabbitMQ.MySql/Startup.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Sample.RabbitMQ.MySql +{ + public class Startup + { + public void ConfigureServices(IServiceCollection services) + { + services.AddDbContext(); + + services.AddCap(x => + { + x.UseEntityFramework(); + x.UseKafka("localhost:9092"); + }); + + services.AddMvc(); + } + + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory) + { + loggerFactory.AddConsole(); + loggerFactory.AddDebug(); + + app.UseMvc(); + + app.UseCap(); + } + } +} diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs index 2dbf888..8d9bf98 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs @@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP { - public class KafkaCapOptionsExtension : ICapOptionsExtension + internal sealed class KafkaCapOptionsExtension : ICapOptionsExtension { private readonly Action _configure; @@ -16,12 +16,10 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { - services.Configure(_configure); - var kafkaOptions = new KafkaOptions(); - _configure(kafkaOptions); + _configure?.Invoke(kafkaOptions); services.AddSingleton(kafkaOptions); - + services.AddSingleton(); services.AddTransient(); } diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs index 34cf92d..e61ccee 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs @@ -21,30 +21,34 @@ namespace DotNetCore.CAP /// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter. /// /// - public IDictionary MainConfig { get; private set; } + public readonly IDictionary MainConfig; /// - /// The `bootstrap.servers` item config of `MainConfig`. + /// The `bootstrap.servers` item config of . /// /// Initial list of brokers as a CSV list of broker host or host:port. /// /// public string Servers { get; set; } - internal IEnumerable> AsRdkafkaConfig() + internal IEnumerable> AskafkaConfig() { if (MainConfig.ContainsKey("bootstrap.servers")) + { return MainConfig.AsEnumerable(); + } - if (string.IsNullOrEmpty(Servers)) + if (string.IsNullOrWhiteSpace(Servers)) { throw new ArgumentNullException(nameof(Servers)); } - else - { - MainConfig.Add("bootstrap.servers", Servers); - } + + MainConfig.Add("bootstrap.servers", Servers); + + MainConfig["queue.buffering.max.ms"] = "10"; + MainConfig["socket.blocking.max.ms"] = "10"; MainConfig["enable.auto.commit"] = "false"; + return MainConfig.AsEnumerable(); } } diff --git a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs index fa6dd2b..5bf11b1 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs @@ -6,6 +6,10 @@ namespace Microsoft.Extensions.DependencyInjection { public static class CapOptionsExtensions { + /// + /// Configuration to use kafka in CAP. + /// + /// Kafka bootstrap server urls. public static CapOptions UseKafka(this CapOptions options, string bootstrapServers) { return options.UseKafka(opt => @@ -14,6 +18,11 @@ namespace Microsoft.Extensions.DependencyInjection }); } + /// + /// Configuration to use kafka in CAP. + /// + /// Provides programmatic configuration for the kafka . + /// public static CapOptions UseKafka(this CapOptions options, Action configure) { if (configure == null) throw new ArgumentNullException(nameof(configure)); diff --git a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs index 06ec6e6..161b073 100644 --- a/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs +++ b/src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs @@ -3,6 +3,9 @@ // ReSharper disable once CheckNamespace namespace DotNetCore.CAP { + /// + /// An attribute for subscribe Kafka messages. + /// public class CapSubscribeAttribute : TopicAttribute { public CapSubscribeAttribute(string name) diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj index 81686f3..0a8fe64 100644 --- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj +++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index 1993d80..1b65188 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Text; using System.Threading; using Confluent.Kafka; @@ -6,36 +7,37 @@ using Confluent.Kafka.Serialization; namespace DotNetCore.CAP.Kafka { - public class KafkaConsumerClient : IConsumerClient + internal sealed class KafkaConsumerClient : IConsumerClient { private readonly string _groupId; private readonly KafkaOptions _kafkaOptions; private Consumer _consumerClient; - public event EventHandler MessageReceieved; + public event EventHandler OnMessageReceieved; + + public event EventHandler OnError; public IDeserializer StringDeserializer { get; set; } public KafkaConsumerClient(string groupId, KafkaOptions options) { _groupId = groupId; - _kafkaOptions = options; + _kafkaOptions = options ?? throw new ArgumentNullException(nameof(options)); StringDeserializer = new StringDeserializer(Encoding.UTF8); } - public void Subscribe(string topic) + public void Subscribe(IEnumerable topics) { - Subscribe(topic, 0); - } + if (topics == null) + throw new ArgumentNullException(nameof(topics)); - public void Subscribe(string topicName, int partition) - { if (_consumerClient == null) { InitKafkaClient(); } - _consumerClient.Assignment.Add(new TopicPartition(topicName, partition)); - _consumerClient.Subscribe(topicName); + + //_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0))); + _consumerClient.Subscribe(topics); } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) @@ -63,10 +65,11 @@ namespace DotNetCore.CAP.Kafka { _kafkaOptions.MainConfig.Add("group.id", _groupId); - var config = _kafkaOptions.AsRdkafkaConfig(); + var config = _kafkaOptions.AskafkaConfig(); _consumerClient = new Consumer(config, null, StringDeserializer); _consumerClient.OnMessage += ConsumerClient_OnMessage; + _consumerClient.OnError += ConsumerClient_OnError; } private void ConsumerClient_OnMessage(object sender, Message e) @@ -77,7 +80,13 @@ namespace DotNetCore.CAP.Kafka Name = e.Topic, Content = e.Value }; - MessageReceieved?.Invoke(sender, message); + + OnMessageReceieved?.Invoke(sender, message); + } + + private void ConsumerClient_OnError(object sender, Error e) + { + OnError?.Invoke(sender, e.Reason); } #endregion private methods diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs index 4f011c0..ea2ee67 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs @@ -1,14 +1,15 @@ -using Microsoft.Extensions.Options; +using System; +using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka { - public class KafkaConsumerClientFactory : IConsumerClientFactory + internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory { private readonly KafkaOptions _kafkaOptions; - public KafkaConsumerClientFactory(IOptions kafkaOptions) + public KafkaConsumerClientFactory(KafkaOptions kafkaOptions) { - _kafkaOptions = kafkaOptions.Value; + _kafkaOptions = kafkaOptions; } public IConsumerClient Create(string groupId) diff --git a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs index 8252629..a3f4c80 100644 --- a/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs @@ -2,52 +2,56 @@ using System.Text; using System.Threading.Tasks; using Confluent.Kafka; -using Confluent.Kafka.Serialization; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Kafka { - public class PublishQueueExecutor : BasePublishQueueExecutor + internal class PublishQueueExecutor : BasePublishQueueExecutor { private readonly ILogger _logger; private readonly KafkaOptions _kafkaOptions; public PublishQueueExecutor(IStateChanger stateChanger, - IOptions options, + KafkaOptions options, ILogger logger) : base(stateChanger, logger) { _logger = logger; - _kafkaOptions = options.Value; + _kafkaOptions = options; } public override Task PublishAsync(string keyName, string content) { try { - var config = _kafkaOptions.AsRdkafkaConfig(); - using (var producer = new Producer(config, null, new StringSerializer(Encoding.UTF8))) + var config = _kafkaOptions.AskafkaConfig(); + var contentBytes = Encoding.UTF8.GetBytes(content); + using (var producer = new Producer(config)) { - producer.ProduceAsync(keyName, null, content); - producer.Flush(); - } + var message = producer.ProduceAsync(keyName, null, contentBytes).Result; - _logger.LogDebug($"kafka topic message [{keyName}] has been published."); + if (!message.Error.HasError) + { + _logger.LogDebug($"kafka topic message [{keyName}] has been published."); - return Task.FromResult(OperateResult.Success); + return Task.FromResult(OperateResult.Success); + } + else + { + return Task.FromResult(OperateResult.Failed(new OperateError + { + Code = message.Error.Code.ToString(), + Description = message.Error.Reason + })); + } + } } catch (Exception ex) { _logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}"); - return Task.FromResult(OperateResult.Failed(ex, - new OperateError() - { - Code = ex.HResult.ToString(), - Description = ex.Message - })); + return Task.FromResult(OperateResult.Failed(ex)); } } } diff --git a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs new file mode 100644 index 0000000..d7a8ffa --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs @@ -0,0 +1,13 @@ +using System; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class EFOptions + { + /// + /// EF dbcontext type. + /// + internal Type DbContextType { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs new file mode 100644 index 0000000..5682e38 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs @@ -0,0 +1,44 @@ +using System; +using DotNetCore.CAP.MySql; +using DotNetCore.CAP.Processor; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal class MySqlCapOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public MySqlCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddTransient(); + + var mysqlOptions = new MySqlOptions(); + _configure(mysqlOptions); + + if (mysqlOptions.DbContextType != null) + { + var provider = TempBuildService(services); + var dbContextObj = provider.GetService(mysqlOptions.DbContextType); + var dbContext = (DbContext)dbContextObj; + mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; + } + services.AddSingleton(mysqlOptions); + } + + private IServiceProvider TempBuildService(IServiceCollection services) + { + return services.BuildServiceProvider(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs new file mode 100644 index 0000000..7ad3801 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs @@ -0,0 +1,13 @@ +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class MySqlOptions : EFOptions + { + /// + /// Gets or sets the database's connection string that will be used to store database entities. + /// + public string ConnectionString { get; set; } + + public string TableNamePrefix { get; set; } = "cap"; + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs new file mode 100644 index 0000000..f838262 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs @@ -0,0 +1,49 @@ +using System; +using DotNetCore.CAP; +using Microsoft.EntityFrameworkCore; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UseMySql(this CapOptions options, string connectionString) + { + return options.UseMySql(opt => + { + opt.ConnectionString = connectionString; + }); + } + + public static CapOptions UseMySql(this CapOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new MySqlCapOptionsExtension(configure)); + + return options; + } + + public static CapOptions UseEntityFramework(this CapOptions options) + where TContext : DbContext + { + return options.UseEntityFramework(opt => + { + opt.DbContextType = typeof(TContext); + }); + } + + public static CapOptions UseEntityFramework(this CapOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var efOptions = new EFOptions { DbContextType = typeof(TContext) }; + configure(efOptions); + + options.RegisterExtension(new MySqlCapOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/CapPublisher.cs b/src/DotNetCore.CAP.MySql/CapPublisher.cs new file mode 100644 index 0000000..82476b3 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/CapPublisher.cs @@ -0,0 +1,203 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.MySql +{ + public class CapPublisher : ICapPublisher + { + private readonly ILogger _logger; + private readonly MySqlOptions _options; + private readonly DbContext _dbContext; + + protected bool IsCapOpenedTrans { get; set; } + + protected bool IsUsingEF { get; } + + protected IServiceProvider ServiceProvider { get; } + + public CapPublisher(IServiceProvider provider, + ILogger logger, + MySqlOptions options) + { + ServiceProvider = provider; + _logger = logger; + _options = options; + + if (_options.DbContextType != null) + { + IsUsingEF = true; + _dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); + } + } + + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Serialize(contentObj); + + PublishCore(name, content); + } + + public Task PublishAsync(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Serialize(contentObj); + + return PublishCoreAsync(name, content); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + PrepareConnection(dbConnection, ref dbTransaction); + + var content = Serialize(contentObj); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + PrepareConnection(dbConnection, ref dbTransaction); + + var content = Serialize(contentObj); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + #region private methods + + private string Serialize(T obj) + { + string content = string.Empty; + if (Helper.IsComplexType(typeof(T))) + { + content = Helper.ToJson(obj); + } + else + { + content = obj?.ToString(); + } + return content; + } + + private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction) + { + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + if (dbConnection.State != ConnectionState.Open) + dbConnection.Open(); + + if (dbTransaction == null) + { + IsCapOpenedTrans = true; + dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } + } + + private void CheckIsUsingEF(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } + + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); + } + + private async Task PublishCoreAsync(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + if (transaction == null) + { + IsCapOpenedTrans = true; + transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + } + var dbTransaction = transaction.GetDbTransaction(); + await PublishWithTransAsync(name, content, connection, dbTransaction); + } + + private void PublishCore(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + if (transaction == null) + { + IsCapOpenedTrans = true; + transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + } + var dbTransaction = transaction.GetDbTransaction(); + PublishWithTrans(name, content, connection, dbTransaction); + } + + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + + PublishQueuer.PulseEvent.Set(); + } + + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + PublishQueuer.PulseEvent.Set(); + } + + private string PrepareSql() + { + return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + + #endregion private methods + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj new file mode 100644 index 0000000..f9f2c51 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj @@ -0,0 +1,27 @@ + + + + + + netstandard1.6 + DotNetCore.CAP.MySql + DotNetCore.CAP.MySql + 1.6.1 + $(PackageTargetFallback);dnxcore50 + false + false + false + + + + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.MySql/FetchedMessage.cs b/src/DotNetCore.CAP.MySql/FetchedMessage.cs new file mode 100644 index 0000000..a0f3b8e --- /dev/null +++ b/src/DotNetCore.CAP.MySql/FetchedMessage.cs @@ -0,0 +1,11 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.MySql +{ + internal class FetchedMessage + { + public int MessageId { get; set; } + + public MessageType MessageType { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs new file mode 100644 index 0000000..b8e5922 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.Logging; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + internal class DefaultAdditionalProcessor : IAdditionalProcessor + { + private readonly IServiceProvider _provider; + private readonly ILogger _logger; + private readonly MySqlOptions _options; + + private const int MaxBatch = 1000; + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2); + + public DefaultAdditionalProcessor( + IServiceProvider provider, + ILogger logger, + MySqlOptions mysqlOptions) + { + _logger = logger; + _provider = provider; + _options = mysqlOptions; + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug("Collecting expired entities."); + + var tables = new string[]{ + $"{_options.TableNamePrefix}.published", + $"{_options.TableNamePrefix}.received" + }; + + foreach (var table in tables) + { + var removedCount = 0; + do + { + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + removedCount = await connection.ExecuteAsync($@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;", + new { now = DateTime.Now, count = MaxBatch }); + } + + if (removedCount != 0) + { + await context.WaitAsync(_delay); + context.ThrowIfStopping(); + } + } while (removedCount != 0); + } + + await context.WaitAsync(_waitingInterval); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs new file mode 100644 index 0000000..f9a4ce1 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs @@ -0,0 +1,74 @@ +using System; +using System.Data; +using System.Threading; +using Dapper; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlFetchedMessage : IFetchedMessage + { + private readonly IDbConnection _connection; + private readonly IDbTransaction _transaction; + private readonly Timer _timer; + private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); + private readonly object _lockObject = new object(); + + public MySqlFetchedMessage(int messageId, + MessageType type, + IDbConnection connection, + IDbTransaction transaction) + { + MessageId = messageId; + MessageType = type; + _connection = connection; + _transaction = transaction; + _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); + } + + public int MessageId { get; } + + public MessageType MessageType { get; } + + public void RemoveFromQueue() + { + lock (_lockObject) + { + _transaction.Commit(); + } + } + + public void Requeue() + { + lock (_lockObject) + { + _transaction.Rollback(); + } + } + + public void Dispose() + { + lock (_lockObject) + { + _timer?.Dispose(); + _transaction.Dispose(); + _connection.Dispose(); + } + } + + private void ExecuteKeepAliveQuery(object obj) + { + lock (_lockObject) + { + try + { + _connection?.Execute("SELECT 1", _transaction); + } + catch + { + // ignored + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorage.cs b/src/DotNetCore.CAP.MySql/MySqlStorage.cs new file mode 100644 index 0000000..137a69f --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorage.cs @@ -0,0 +1,66 @@ +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using Microsoft.Extensions.Logging; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorage : IStorage + { + private readonly MySqlOptions _options; + private readonly ILogger _logger; + + public MySqlStorage(ILogger logger, MySqlOptions options) + { + _options = options; + _logger = logger; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + var sql = CreateDbTablesScript(_options.TableNamePrefix); + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + + _logger.LogDebug("Ensuring all create database tables script are applied."); + } + + protected virtual string CreateDbTablesScript(string prefix) + { + var batchSql = + $@" +CREATE TABLE IF NOT EXISTS `{prefix}.queue` ( + `MessageId` int(11) NOT NULL, + `MessageType` tinyint(4) NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `{prefix}.received` ( + `Id` int(127) NOT NULL AUTO_INCREMENT, + `Name` varchar(400) NOT NULL, + `Group` varchar(200) DEFAULT NULL, + `Content` longtext, + `Retries` int(11) DEFAULT NULL, + `Added` datetime(6) NOT NULL, + `ExpiresAt` datetime(6) DEFAULT NULL, + `StatusName` varchar(50) NOT NULL, + PRIMARY KEY (`Id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + +CREATE TABLE IF NOT EXISTS `{prefix}.published` ( + `Id` int(127) NOT NULL AUTO_INCREMENT, + `Name` varchar(200) NOT NULL, + `Content` longtext, + `Retries` int(11) DEFAULT NULL, + `Added` datetime(6) NOT NULL, + `ExpiresAt` datetime(6) DEFAULT NULL, + `StatusName` varchar(40) NOT NULL, + PRIMARY KEY (`Id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8;"; + return batchSql; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs new file mode 100644 index 0000000..946f697 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorageConnection : IStorageConnection + { + private readonly MySqlOptions _options; + private readonly string _prefix; + + public MySqlStorageConnection(MySqlOptions options) + { + _options = options; + _prefix = _options.TableNamePrefix; + } + + public MySqlOptions Options => _options; + + public IStorageTransaction CreateTransaction() + { + return new MySqlStorageTransaction(this); + } + + public async Task GetPublishedMessageAsync(int id) + { + var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public Task FetchNextMessageAsync() + { + //Last execute statement(FOR UPDATE to fix dirty read) : + + //SET TRANSACTION ISOLATION LEVEL READ COMMITTED; + //START TRANSACTION; + //SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; + //DELETE FROM `{_prefix}.queue` LIMIT 1; + //COMMIT; + + var sql = $@" +SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; +DELETE FROM `{_prefix}.queue` LIMIT 1;"; + + return FetchNextMessageCoreAsync(sql); + } + + public async Task GetNextPublishedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + // CapReceviedMessage + + public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $@" +INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) +VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql, message); + } + } + + public async Task GetReceivedMessageAsync(int id) + { + var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task GetNextReceviedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';"; + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + public void Dispose() + { + } + + private async Task FetchNextMessageCoreAsync(string sql, object args = null) + { + //here don't use `using` to dispose + var connection = new MySqlConnection(_options.ConnectionString); + await connection.OpenAsync(); + var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); + FetchedMessage fetchedMessage = null; + try + { + fetchedMessage = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); + } + catch (MySqlException) + { + transaction.Dispose(); + throw; + } + + if (fetchedMessage == null) + { + transaction.Rollback(); + transaction.Dispose(); + connection.Dispose(); + return null; + } + + return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs new file mode 100644 index 0000000..d93be89 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs @@ -0,0 +1,71 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Models; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + public class MySqlStorageTransaction : IStorageTransaction, IDisposable + { + private readonly string _prefix; + + private readonly IDbTransaction _dbTransaction; + private readonly IDbConnection _dbConnection; + + public MySqlStorageTransaction(MySqlStorageConnection connection) + { + var options = connection.Options; + _prefix = options.TableNamePrefix; + + _dbConnection = new MySqlConnection(options.ConnectionString); + _dbConnection.Open(); + _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } + + public void UpdateMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void UpdateMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void EnqueueMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction); + } + + public void EnqueueMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction); + } + + public Task CommitAsync() + { + _dbTransaction.Commit(); + return Task.CompletedTask; + } + + public void Dispose() + { + _dbTransaction.Dispose(); + _dbConnection.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs index 09a90ef..b1061d8 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs @@ -1,4 +1,6 @@ -// ReSharper disable once CheckNamespace +using System; + +// ReSharper disable once CheckNamespace namespace DotNetCore.CAP { public class RabbitMQOptions @@ -27,16 +29,16 @@ namespace DotNetCore.CAP public const string DefaultVHost = "/"; /// - /// Default exchange name (value: "cap"). + /// Default exchange name (value: "cap.default.topic"). /// - public const string DefaultExchangeName = "cap"; + public const string DefaultExchangeName = "cap.default.topic"; + + /// The topic exchange type. + public const string ExchangeType = "topic"; /// The host to connect to. public string HostName { get; set; } = "localhost"; - /// The topic exchange type. - internal const string ExchangeType = "topic"; - /// /// Password to use when authenticating to the server. /// @@ -76,5 +78,10 @@ namespace DotNetCore.CAP /// The port to connect on. /// public int Port { get; set; } = -1; + + /// + /// Gets or sets queue message automatic deletion time (in milliseconds). Default 864000000 ms (10 days). + /// + public int QueueMessageExpires { get; set; } = 864000000; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs index 14eedbb..4202570 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs @@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP { - public class RabbitMQCapOptionsExtension : ICapOptionsExtension + internal sealed class RabbitMQCapOptionsExtension : ICapOptionsExtension { private readonly Action _configure; @@ -16,12 +16,9 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { - services.Configure(_configure); - - var rabbitMQOptions = new RabbitMQOptions(); - _configure(rabbitMQOptions); - - services.AddSingleton(rabbitMQOptions); + var options = new RabbitMQOptions(); + _configure?.Invoke(options); + services.AddSingleton(options); services.AddSingleton(); services.AddTransient(); diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs index 92fb295..811a281 100644 --- a/src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs +++ b/src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs @@ -3,6 +3,9 @@ // ReSharper disable once CheckNamespace namespace DotNetCore.CAP { + /// + /// An attribute for subscribe RabbitMQ messages. + /// public class CapSubscribeAttribute : TopicAttribute { public CapSubscribeAttribute(string name) : base(name) diff --git a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj index 30abf82..3a758b2 100644 --- a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj +++ b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs index 3e439e2..27b777b 100644 --- a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs +++ b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs @@ -8,18 +8,18 @@ using RabbitMQ.Client; namespace DotNetCore.CAP.RabbitMQ { - public class PublishQueueExecutor : BasePublishQueueExecutor + internal sealed class PublishQueueExecutor : BasePublishQueueExecutor { private readonly ILogger _logger; private readonly RabbitMQOptions _rabbitMQOptions; public PublishQueueExecutor(IStateChanger stateChanger, - IOptions options, + RabbitMQOptions options, ILogger logger) : base(stateChanger, logger) { _logger = logger; - _rabbitMQOptions = options.Value; + _rabbitMQOptions = options; } public override Task PublishAsync(string keyName, string content) @@ -43,7 +43,7 @@ namespace DotNetCore.CAP.RabbitMQ { var body = Encoding.UTF8.GetBytes(content); - channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType); + channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, durable: true); channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName, routingKey: keyName, basicProperties: null, diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs index 4dd2883..11d888a 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -7,7 +8,7 @@ using RabbitMQ.Client.Events; namespace DotNetCore.CAP.RabbitMQ { - public class RabbitMQConsumerClient : IConsumerClient + internal sealed class RabbitMQConsumerClient : IConsumerClient { private readonly string _exchageName; private readonly string _queueName; @@ -18,7 +19,9 @@ namespace DotNetCore.CAP.RabbitMQ private IModel _channel; private ulong _deliveryTag; - public event EventHandler MessageReceieved; + public event EventHandler OnMessageReceieved; + + public event EventHandler OnError; public RabbitMQConsumerClient(string queueName, RabbitMQOptions options) { @@ -45,31 +48,42 @@ namespace DotNetCore.CAP.RabbitMQ _connection = _connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); - _channel.ExchangeDeclare(exchange: _exchageName, type: RabbitMQOptions.ExchangeType); - _channel.QueueDeclare(_queueName, exclusive: false); + + _channel.ExchangeDeclare( + exchange: _exchageName, + type: RabbitMQOptions.ExchangeType, + durable: true); + + var arguments = new Dictionary { { "x-message-ttl", (int)_rabbitMQOptions.QueueMessageExpires } }; + _channel.QueueDeclare(_queueName, + durable: true, + exclusive: false, + autoDelete: false, + arguments: arguments); + } + + public void Subscribe(IEnumerable topics) + { + if (topics == null) throw new ArgumentNullException(nameof(topics)); + + foreach (var topic in topics) + { + _channel.QueueBind(_queueName, _exchageName, topic); + } } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { var consumer = new EventingBasicConsumer(_channel); consumer.Received += OnConsumerReceived; + consumer.Shutdown += OnConsumerShutdown; _channel.BasicConsume(_queueName, false, consumer); while (true) { - Task.Delay(timeout, cancellationToken).Wait(); + Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult(); } } - public void Subscribe(string topic) - { - _channel.QueueBind(_queueName, _exchageName, topic); - } - - public void Subscribe(string topic, int partition) - { - _channel.QueueBind(_queueName, _exchageName, topic); - } - public void Commit() { _channel.BasicAck(_deliveryTag, false); @@ -90,7 +104,12 @@ namespace DotNetCore.CAP.RabbitMQ Name = e.RoutingKey, Content = Encoding.UTF8.GetString(e.Body) }; - MessageReceieved?.Invoke(sender, message); + OnMessageReceieved?.Invoke(sender, message); + } + + private void OnConsumerShutdown(object sender, ShutdownEventArgs e) + { + OnError?.Invoke(sender, e.Cause?.ToString()); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs index 2d64f0c..fcd267d 100644 --- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs +++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs @@ -2,13 +2,13 @@ namespace DotNetCore.CAP.RabbitMQ { - public class RabbitMQConsumerClientFactory : IConsumerClientFactory + internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory { private readonly RabbitMQOptions _rabbitMQOptions; - public RabbitMQConsumerClientFactory(IOptions rabbitMQOptions) + public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions) { - _rabbitMQOptions = rabbitMQOptions.Value; + _rabbitMQOptions = rabbitMQOptions; } public IConsumerClient Create(string groupId) diff --git a/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs b/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs index 6d162c5..b48dcd2 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs @@ -16,6 +16,6 @@ namespace DotNetCore.CAP /// /// EF dbcontext type. /// - public Type DbContextType { get; internal set; } + internal Type DbContextType { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs index 0425b8f..2d1878e 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs @@ -7,7 +7,7 @@ using Microsoft.Extensions.DependencyInjection; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP { - public class SqlServerCapOptionsExtension : ICapOptionsExtension + internal class SqlServerCapOptionsExtension : ICapOptionsExtension { private readonly Action _configure; @@ -26,14 +26,13 @@ namespace DotNetCore.CAP var sqlServerOptions = new SqlServerOptions(); _configure(sqlServerOptions); - var provider = TempBuildService(services); - var dbContextObj = provider.GetService(sqlServerOptions.DbContextType); - if (dbContextObj != null) + if (sqlServerOptions.DbContextType != null) { + var provider = TempBuildService(services); + var dbContextObj = provider.GetService(sqlServerOptions.DbContextType); var dbContext = (DbContext)dbContextObj; sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; } - services.Configure(_configure); services.AddSingleton(sqlServerOptions); } diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 5f84093..674b7f7 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -38,25 +38,11 @@ namespace DotNetCore.CAP.SqlServer } } - public void Publish(string name, string content) - { - CheckIsUsingEF(name); - - PublishCore(name, content); - } - - public Task PublishAsync(string name, string content) - { - CheckIsUsingEF(name); - - return PublishCoreAsync(name, content); - } - public void Publish(string name, T contentObj) { CheckIsUsingEF(name); - var content = Helper.ToJson(contentObj); + var content = Serialize(contentObj); PublishCore(name, content); } @@ -65,67 +51,62 @@ namespace DotNetCore.CAP.SqlServer { CheckIsUsingEF(name); - var content = Helper.ToJson(contentObj); + var content = Serialize(contentObj); return PublishCoreAsync(name, content); } - public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) { CheckIsAdoNet(name); + PrepareConnection(dbConnection, ref dbTransaction); - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - IsCapOpenedTrans = true; + var content = Serialize(contentObj); PublishWithTrans(name, content, dbConnection, dbTransaction); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) { CheckIsAdoNet(name); + PrepareConnection(dbConnection, ref dbTransaction); - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - IsCapOpenedTrans = true; + var content = Serialize(contentObj); return PublishWithTransAsync(name, content, dbConnection, dbTransaction); } - public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) - { - CheckIsAdoNet(name); - - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - var content = Helper.ToJson(contentObj); - - dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + #region private methods - PublishWithTrans(name, content, dbConnection, dbTransaction); + private string Serialize(T obj) + { + string content = string.Empty; + if (Helper.IsComplexType(typeof(T))) + { + content = Helper.ToJson(obj); + } + else + { + content = obj.ToString(); + } + return content; } - public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction) { - CheckIsAdoNet(name); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); - var content = Helper.ToJson(contentObj); - - dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + if (dbConnection.State != ConnectionState.Open) + dbConnection.Open(); - return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + if (dbTransaction == null) + { + IsCapOpenedTrans = true; + dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } } - #region private methods - private void CheckIsUsingEF(string name) { if (name == null) throw new ArgumentNullException(nameof(name)); @@ -145,8 +126,11 @@ namespace DotNetCore.CAP.SqlServer { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; - IsCapOpenedTrans = transaction == null; - transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + if (transaction == null) + { + IsCapOpenedTrans = true; + transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + } var dbTransaction = transaction.GetDbTransaction(); await PublishWithTransAsync(name, content, connection, dbTransaction); } @@ -155,8 +139,11 @@ namespace DotNetCore.CAP.SqlServer { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; - IsCapOpenedTrans = transaction == null; - transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + if (transaction == null) + { + IsCapOpenedTrans = true; + transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + } var dbTransaction = transaction.GetDbTransaction(); PublishWithTrans(name, content, connection, dbTransaction); } diff --git a/src/DotNetCore.CAP.SqlServer/FetchedMessage.cs b/src/DotNetCore.CAP.SqlServer/FetchedMessage.cs index 521fdeb..990b61e 100644 --- a/src/DotNetCore.CAP.SqlServer/FetchedMessage.cs +++ b/src/DotNetCore.CAP.SqlServer/FetchedMessage.cs @@ -2,7 +2,7 @@ namespace DotNetCore.CAP.SqlServer { - public class FetchedMessage + internal class FetchedMessage { public int MessageId { get; set; } diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index a324a98..b8fba46 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -6,7 +6,6 @@ using System.Threading.Tasks; using Dapper; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; -using Microsoft.EntityFrameworkCore; namespace DotNetCore.CAP.SqlServer { @@ -102,7 +101,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; public async Task> GetFailedReceviedMessages() { - var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + var sql = $"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; using (var connection = new SqlConnection(_options.ConnectionString)) { return await connection.QueryAsync(sql); @@ -115,7 +114,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; private async Task FetchNextMessageCoreAsync(string sql, object args = null) { - //here don't use `using` to dispose + //here don't use `using` to dispose var connection = new SqlConnection(_options.ConnectionString); await connection.OpenAsync(); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); diff --git a/src/DotNetCore.CAP/Abstractions/ConsumerContext.cs b/src/DotNetCore.CAP/Abstractions/ConsumerContext.cs index 7c8ece6..bb0cbb4 100644 --- a/src/DotNetCore.CAP/Abstractions/ConsumerContext.cs +++ b/src/DotNetCore.CAP/Abstractions/ConsumerContext.cs @@ -1,5 +1,4 @@ using System; -using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Abstractions { diff --git a/src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs b/src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs index f03b105..ad276b7 100644 --- a/src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs +++ b/src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs @@ -7,15 +7,6 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding /// public interface IModelBinder { - /// - /// Attempts to bind a model. - /// - /// The . - /// - /// - /// A which will complete when the model binding process completes. - /// - /// - Task BindModelAsync(ModelBindingContext bindingContext); + Task BindModelAsync(string content); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingContext.cs b/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingContext.cs deleted file mode 100644 index de6f224..0000000 --- a/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingContext.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System; -using Microsoft.Extensions.Primitives; - -namespace DotNetCore.CAP.Abstractions.ModelBinding -{ - /// - /// A context that contains operating information for model binding and validation. - /// - public class ModelBindingContext - { - /// - /// Gets or sets the model value for the current operation. - /// - /// - /// The will typically be set for a binding operation that works - /// against a pre-existing model object to update certain properties. - /// - public object Model { get; set; } - - /// - /// Gets or sets the name of the model. - /// - public string ModelName { get; set; } - - /// - /// Gets or sets the type of the model. - /// - public Type ModelType { get; set; } - - /// - /// Gets or sets the values of the model. - /// - public StringValues Values { get; set; } - - /// - /// - /// Gets or sets a result which represents the result of the model binding process. - /// - /// - public object Result { get; set; } - - /// - /// Creates a new for top-level model binding operation. - /// - public static ModelBindingContext CreateBindingContext(string values, string modelName, Type modelType) - { - return new ModelBindingContext() - { - ModelName = modelName, - ModelType = modelType, - Values = values - }; - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingResult.cs b/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingResult.cs new file mode 100644 index 0000000..b2896c1 --- /dev/null +++ b/src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingResult.cs @@ -0,0 +1,105 @@ +using DotNetCore.CAP.Internal; + +namespace DotNetCore.CAP.Abstractions.ModelBinding +{ + /// + /// Contains the result of model binding. + /// + public struct ModelBindingResult + { + /// + /// Creates a representing a failed model binding operation. + /// + /// A representing a failed model binding operation. + public static ModelBindingResult Failed() + { + return new ModelBindingResult(model: null, isSuccess: false); + } + + /// + /// Creates a representing a successful model binding operation. + /// + /// The model value. May be null. + /// A representing a successful model bind. + public static ModelBindingResult Success(object model) + { + return new ModelBindingResult(model, isSuccess: true); + } + + private ModelBindingResult(object model, bool isSuccess) + { + Model = model; + IsSuccess = isSuccess; + } + + /// + /// Gets the model associated with this context. + /// + public object Model { get; } + + public bool IsSuccess { get; } + + public override string ToString() + { + if (IsSuccess) + { + return $"Success '{Model}'"; + } + else + { + return $"Failed"; + } + } + + public override bool Equals(object obj) + { + var other = obj as ModelBindingResult?; + if (other == null) + { + return false; + } + else + { + return Equals(other.Value); + } + } + + public override int GetHashCode() + { + var hashCodeCombiner = HashCodeCombiner.Start(); + hashCodeCombiner.Add(IsSuccess); + hashCodeCombiner.Add(Model); + + return hashCodeCombiner.CombinedHash; + } + + public bool Equals(ModelBindingResult other) + { + return + IsSuccess == other.IsSuccess && + object.Equals(Model, other.Model); + } + + /// + /// Compares objects for equality. + /// + /// A . + /// A . + /// true if the objects are equal, otherwise false. + public static bool operator ==(ModelBindingResult x, ModelBindingResult y) + { + return x.Equals(y); + } + + /// + /// Compares objects for inequality. + /// + /// A . + /// A . + /// true if the objects are not equal, otherwise false. + public static bool operator !=(ModelBindingResult x, ModelBindingResult y) + { + return !x.Equals(y); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs b/src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs index b5ebb1e..77c389b 100644 --- a/src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs +++ b/src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs @@ -25,7 +25,7 @@ namespace Microsoft.AspNetCore.Builder if (marker == null) { - throw new InvalidOperationException("Add Consistency must be called on the service collection."); + throw new InvalidOperationException("Add Cap must be called on the service collection."); } var provider = app.ApplicationServices; diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index 8128441..0d17e67 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -13,34 +13,45 @@ namespace DotNetCore.CAP /// /// Default value for polling delay timeout, in seconds. /// - public const int DefaultPollingDelay = 8; + public const int DefaultPollingDelay = 15; + + /// + /// Default processor count to process messages of cap.queue. + /// + public const int DefaultQueueProcessorCount = 2; public CapOptions() { PollingDelay = DefaultPollingDelay; + QueueProcessorCount = DefaultQueueProcessorCount; Extensions = new List(); } /// - /// Productor job polling delay time. Default is 5 sec. + /// Productor job polling delay time. Default is 15 sec. + /// + public int PollingDelay { get; set; } + + /// + /// Gets or sets the messages queue (Cap.Queue table) processor count. /// - public int PollingDelay { get; set; } = 5; + public int QueueProcessorCount { get; set; } /// - /// Failed messages polling delay time. Default is 2 min. + /// Failed messages polling delay time. Default is 3 min. /// - public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2); + public int FailedMessageWaitingInterval { get; set; } = (int)TimeSpan.FromMinutes(3).TotalSeconds; /// - /// We’ll send a POST request to the URL below with details of any subscribed events. + /// We’ll invoke this call-back with message type,name,content when requeue failed message. /// - public WebHook WebHook => throw new NotSupportedException(); + public Action FailedCallback { get; set; } /// - /// Registers an extension that will be executed when building services. - /// - /// - public void RegisterExtension(ICapOptionsExtension extension) + /// Registers an extension that will be executed when building services. + /// + /// + public void RegisterExtension(ICapOptionsExtension extension) { if (extension == null) throw new ArgumentNullException(nameof(extension)); @@ -48,11 +59,4 @@ namespace DotNetCore.CAP Extensions.Add(extension); } } - - public class WebHook - { - public string PayloadUrl { get; set; } - - public string Secret { get; set; } - } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 60a5fca..085205e 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -3,7 +3,6 @@ using System.Collections.Generic; using System.Reflection; using DotNetCore.CAP; using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Processor; @@ -35,7 +34,7 @@ namespace Microsoft.Extensions.DependencyInjection AddSubscribeServices(services); services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); @@ -60,7 +59,7 @@ namespace Microsoft.Extensions.DependencyInjection foreach (var serviceExtension in options.Extensions) { serviceExtension.AddServices(services); - } + } services.AddSingleton(options); return new CapBuilder(services); diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs index 09b9291..195916b 100644 --- a/src/DotNetCore.CAP/IBootstrapper.Default.cs +++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs @@ -40,7 +40,7 @@ namespace DotNetCore.CAP _cts.Cancel(); try { - _bootstrappingTask?.Wait(); + _bootstrappingTask?.GetAwaiter().GetResult(); } catch (OperationCanceledException ex) { diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index a45061c..a3a7210 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -8,28 +8,6 @@ namespace DotNetCore.CAP /// public interface ICapPublisher { - /// - /// (EntityFramework) Asynchronous publish a message. - /// - /// If you are using the EntityFramework, you need to configure the DbContextType first. - /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. - /// - /// - /// the topic name or exchange router key. - /// message body content. - Task PublishAsync(string name, string content); - - /// - /// (EntityFramework) Publish a message. - /// - /// If you are using the EntityFramework, you need to configure the DbContextType first. - /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. - /// - /// - /// the topic name or exchange router key. - /// message body content. - void Publish(string name, string content); - /// /// (EntityFramework) Asynchronous publish a object message. /// @@ -54,24 +32,6 @@ namespace DotNetCore.CAP /// message body content, that will be serialized of json. void Publish(string name, T contentObj); - /// - /// (ado.net) Asynchronous publish a message. - /// - /// the topic name or exchange router key. - /// message body content - /// the connection of - /// the transaction of - Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); - - /// - /// (ado.net) Publish a message. - /// - /// the topic name or exchange router key. - /// message body content. - /// the connection of - /// the transaction of - void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); - /// /// (ado.net) Asynchronous publish a object message. /// diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs index c59c20c..63cdf86 100644 --- a/src/DotNetCore.CAP/IConsumerClient.cs +++ b/src/DotNetCore.CAP/IConsumerClient.cs @@ -1,6 +1,6 @@ using System; +using System.Collections.Generic; using System.Threading; -using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP { @@ -9,14 +9,14 @@ namespace DotNetCore.CAP /// public interface IConsumerClient : IDisposable { - void Subscribe(string topic); - - void Subscribe(string topic, int partition); + void Subscribe(IEnumerable topics); void Listening(TimeSpan timeout, CancellationToken cancellationToken); void Commit(); - event EventHandler MessageReceieved; + event EventHandler OnMessageReceieved; + + event EventHandler OnError; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 6bcf22d..2990e8b 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; @@ -56,10 +57,7 @@ namespace DotNetCore.CAP { RegisterMessageProcessor(client); - foreach (var item in matchGroup.Value) - { - client.Subscribe(item.Attribute.Name); - } + client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); client.Listening(_pollingDelay, _cts.Token); } @@ -95,7 +93,7 @@ namespace DotNetCore.CAP private void RegisterMessageProcessor(IConsumerClient client) { - client.MessageReceieved += (sender, message) => + client.OnMessageReceieved += (sender, message) => { _logger.EnqueuingReceivedMessage(message.Name, message.Content); @@ -106,6 +104,11 @@ namespace DotNetCore.CAP } Pulse(); }; + + client.OnError += (sender, reason) => + { + _logger.LogError(reason); + }; } private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) @@ -116,7 +119,7 @@ namespace DotNetCore.CAP { StatusName = StatusName.Scheduled, }; - messageStore.StoreReceivedMessageAsync(receivedMessage).Wait(); + messageStore.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult(); return receivedMessage; } diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs index e335637..e8fd93c 100644 --- a/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs +++ b/src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs @@ -2,7 +2,6 @@ using System.Diagnostics; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor; diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index e481a56..33b022b 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -56,6 +56,7 @@ namespace DotNetCore.CAP /// Returns executed failed message. /// Task> GetFailedReceviedMessages(); + //----------------------------------------- /// diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs index b3b6c29..a617e13 100644 --- a/src/DotNetCore.CAP/Infrastructure/Helper.cs +++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs @@ -1,4 +1,5 @@ using System; +using System.ComponentModel; using System.Reflection; using Newtonsoft.Json; @@ -68,5 +69,10 @@ namespace DotNetCore.CAP.Infrastructure return !typeInfo.ContainsGenericParameters && typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase); } + + public static bool IsComplexType(Type type) + { + return !TypeDescriptor.GetConverter(type).CanConvertFrom(typeof(string)); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs b/src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs index d2492dd..18a2bd1 100644 --- a/src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs +++ b/src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs @@ -1,6 +1,4 @@ using System; -using System.Collections.Generic; -using System.Text; namespace DotNetCore.CAP.Infrastructure { @@ -11,4 +9,4 @@ namespace DotNetCore.CAP.Infrastructure throw new NotImplementedException(); } } -} +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs index 45b669d..0921e1b 100644 --- a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs +++ b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs @@ -1,7 +1,5 @@ using System; using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Abstractions.ModelBinding; -using DotNetCore.CAP.Infrastructure; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.Internal @@ -10,15 +8,15 @@ namespace DotNetCore.CAP.Internal { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; - private readonly IModelBinder _modelBinder; + private readonly IModelBinderFactory _modelBinderFactory; public ConsumerInvokerFactory( ILoggerFactory loggerFactory, - IModelBinder modelBinder, + IModelBinderFactory modelBinderFactory, IServiceProvider serviceProvider) { _logger = loggerFactory.CreateLogger(); - _modelBinder = modelBinder; + _modelBinderFactory = modelBinderFactory; _serviceProvider = serviceProvider; } @@ -26,7 +24,7 @@ namespace DotNetCore.CAP.Internal { var context = new ConsumerInvokerContext(consumerContext) { - Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinder, consumerContext) + Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinderFactory, consumerContext) }; return context.Result; diff --git a/src/DotNetCore.CAP/Internal/HashCodeCombiner.cs b/src/DotNetCore.CAP/Internal/HashCodeCombiner.cs new file mode 100644 index 0000000..205f541 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/HashCodeCombiner.cs @@ -0,0 +1,81 @@ +using System.Collections; +using System.Collections.Generic; +using System.Runtime.CompilerServices; + +namespace DotNetCore.CAP.Internal +{ + internal struct HashCodeCombiner + { + private long _combinedHash64; + + public int CombinedHash + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get { return _combinedHash64.GetHashCode(); } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private HashCodeCombiner(long seed) + { + _combinedHash64 = seed; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Add(IEnumerable e) + { + if (e == null) + { + Add(0); + } + else + { + var count = 0; + foreach (object o in e) + { + Add(o); + count++; + } + Add(count); + } + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static implicit operator int(HashCodeCombiner self) + { + return self.CombinedHash; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Add(int i) + { + _combinedHash64 = ((_combinedHash64 << 5) + _combinedHash64) ^ i; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Add(string s) + { + var hashCode = (s != null) ? s.GetHashCode() : 0; + Add(hashCode); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Add(object o) + { + var hashCode = (o != null) ? o.GetHashCode() : 0; + Add(hashCode); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Add(TValue value, IEqualityComparer comparer) + { + var hashCode = value != null ? comparer.GetHashCode(value) : 0; + Add(hashCode); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static HashCodeCombiner Start() + { + return new HashCodeCombiner(0x1505L); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index b6e74dc..3009f73 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -1,7 +1,6 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Abstractions.ModelBinding; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -11,16 +10,16 @@ namespace DotNetCore.CAP.Internal { private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; - private readonly IModelBinder _modelBinder; + private readonly IModelBinderFactory _modelBinderFactory; private readonly ConsumerContext _consumerContext; private readonly ObjectMethodExecutor _executor; public DefaultConsumerInvoker(ILogger logger, IServiceProvider serviceProvider, - IModelBinder modelBinder, + IModelBinderFactory modelBinderFactory, ConsumerContext consumerContext) { - _modelBinder = modelBinder; + _modelBinderFactory = modelBinderFactory; _serviceProvider = serviceProvider; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -29,32 +28,41 @@ namespace DotNetCore.CAP.Internal _consumerContext.ConsumerDescriptor.ImplTypeInfo); } - public Task InvokeAsync() + public async Task InvokeAsync() { using (_logger.BeginScope("consumer invoker begin")) { _logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, - _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); + _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); var value = _consumerContext.DeliverMessage.Content; - if (_executor.MethodParameters.Length > 0) { var firstParameter = _executor.MethodParameters[0]; - - var bindingContext = ModelBindingContext.CreateBindingContext(value, - firstParameter.Name, firstParameter.ParameterType); - - _modelBinder.BindModelAsync(bindingContext); - _executor.Execute(obj, bindingContext.Result); + try + { + var binder = _modelBinderFactory.CreateBinder(firstParameter); + var result = await binder.BindModelAsync(value); + if (result.IsSuccess) + { + _executor.Execute(obj, result.Model); + } + else + { + _logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value); + } + } + catch (FormatException ex) + { + _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex); + } } else { _executor.Execute(obj); } - return Task.CompletedTask; } } } diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index ab67b97..4746b5c 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -58,13 +58,7 @@ namespace DotNetCore.CAP.Internal continue; } - foreach (var method in typeInfo.DeclaredMethods) - { - var topicAttr = method.GetCustomAttribute(true); - if (topicAttr == null) continue; - - executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); - } + executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); } return executorDescriptorList; } @@ -82,18 +76,27 @@ namespace DotNetCore.CAP.Internal //double check if (!Helper.IsController(typeInfo)) continue; - foreach (var method in typeInfo.DeclaredMethods) - { - var topicAttr = method.GetCustomAttribute(true); - if (topicAttr == null) continue; - - executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo)); - } + executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); } return executorDescriptorList; } + private static IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo) + { + foreach (var method in typeInfo.DeclaredMethods) + { + var topicAttrs = method.GetCustomAttributes(true); + + if (topicAttrs.Count() == 0) continue; + + foreach (var attr in topicAttrs) + { + yield return InitDescriptor(attr, method, typeInfo); + } + } + } + private static ConsumerExecutorDescriptor InitDescriptor( TopicAttribute attr, MethodInfo methodInfo, diff --git a/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs b/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs new file mode 100644 index 0000000..c25f7c6 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs @@ -0,0 +1,32 @@ +using System; +using System.Reflection; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions.ModelBinding; +using DotNetCore.CAP.Infrastructure; + +namespace DotNetCore.CAP.Internal +{ + public class ComplexTypeModelBinder : IModelBinder + { + private readonly ParameterInfo _parameterInfo; + + public ComplexTypeModelBinder(ParameterInfo parameterInfo) + { + _parameterInfo = parameterInfo; + } + + public Task BindModelAsync(string content) + { + try + { + var type = _parameterInfo.ParameterType; + var value = Helper.FromJson(content, type); + return Task.FromResult(ModelBindingResult.Success(value)); + } + catch (Exception) + { + return Task.FromResult(ModelBindingResult.Failed()); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IModelBinder.Default.cs b/src/DotNetCore.CAP/Internal/IModelBinder.Default.cs deleted file mode 100644 index dd9a9bb..0000000 --- a/src/DotNetCore.CAP/Internal/IModelBinder.Default.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using System.Linq.Expressions; -using System.Reflection; -using System.Threading.Tasks; -using DotNetCore.CAP.Abstractions.ModelBinding; -using DotNetCore.CAP.Infrastructure; - -namespace DotNetCore.CAP.Internal -{ - public class DefaultModelBinder : IModelBinder - { - private Func _modelCreator; - - public Task BindModelAsync(ModelBindingContext bindingContext) - { - if (bindingContext.Model == null) - { - bindingContext.Model = CreateModel(bindingContext); - } - - bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType); - - return Task.CompletedTask; - } - - protected virtual object CreateModel(ModelBindingContext bindingContext) - { - if (bindingContext == null) - { - throw new ArgumentNullException(nameof(bindingContext)); - } - - if (_modelCreator != null) return _modelCreator(); - var modelTypeInfo = bindingContext.ModelType.GetTypeInfo(); - if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null) - { - throw new InvalidOperationException(); - } - - _modelCreator = Expression - .Lambda>(Expression.New(bindingContext.ModelType)) - .Compile(); - - return _modelCreator(); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IModelBinder.SimpleType.cs b/src/DotNetCore.CAP/Internal/IModelBinder.SimpleType.cs new file mode 100644 index 0000000..d00f8b8 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/IModelBinder.SimpleType.cs @@ -0,0 +1,86 @@ +using System; +using System.ComponentModel; +using System.Globalization; +using System.Reflection; +using System.Runtime.ExceptionServices; +using System.Threading.Tasks; +using DotNetCore.CAP.Abstractions.ModelBinding; + +namespace DotNetCore.CAP.Internal +{ + public class SimpleTypeModelBinder : IModelBinder + { + private readonly ParameterInfo _parameterInfo; + private readonly TypeConverter _typeConverter; + + public SimpleTypeModelBinder(ParameterInfo parameterInfo) + { + _parameterInfo = parameterInfo ?? throw new ArgumentNullException(nameof(parameterInfo)); + _typeConverter = TypeDescriptor.GetConverter(parameterInfo.ParameterType); + } + + public Task BindModelAsync(string content) + { + if (content == null) + { + throw new ArgumentNullException(nameof(content)); + } + + var parameterType = _parameterInfo.ParameterType; + + try + { + object model; + if (parameterType == typeof(string)) + { + if (string.IsNullOrWhiteSpace(content)) + { + model = null; + } + else + { + model = content; + } + } + else if (string.IsNullOrWhiteSpace(content)) + { + // Other than the StringConverter, converters Trim() the value then throw if the result is empty. + model = null; + } + else + { + model = _typeConverter.ConvertFrom( + context: null, + culture: CultureInfo.CurrentCulture, + value: content); + } + + if (model == null && !IsReferenceOrNullableType(parameterType)) + { + return Task.FromResult(ModelBindingResult.Failed()); + } + else + { + return Task.FromResult(ModelBindingResult.Success(model)); + } + } + catch (Exception exception) + { + var isFormatException = exception is FormatException; + if (!isFormatException && exception.InnerException != null) + { + // TypeConverter throws System.Exception wrapping the FormatException, + // so we capture the inner exception. + exception = ExceptionDispatchInfo.Capture(exception.InnerException).SourceException; + } + throw exception; + } + } + + private bool IsReferenceOrNullableType(Type type) + { + var isNullableValueType = Nullable.GetUnderlyingType(type) != null; + return !type.GetTypeInfo().IsValueType || isNullableValueType; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IModelBinderFactory.cs b/src/DotNetCore.CAP/Internal/IModelBinderFactory.cs new file mode 100644 index 0000000..0922ea7 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/IModelBinderFactory.cs @@ -0,0 +1,10 @@ +using System.Reflection; +using DotNetCore.CAP.Abstractions.ModelBinding; + +namespace DotNetCore.CAP.Internal +{ + public interface IModelBinderFactory + { + IModelBinder CreateBinder(ParameterInfo parameter); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ModelBinderFactory.cs b/src/DotNetCore.CAP/Internal/ModelBinderFactory.cs new file mode 100644 index 0000000..5af7337 --- /dev/null +++ b/src/DotNetCore.CAP/Internal/ModelBinderFactory.cs @@ -0,0 +1,114 @@ +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Runtime.CompilerServices; +using DotNetCore.CAP.Abstractions.ModelBinding; +using DotNetCore.CAP.Infrastructure; + +namespace DotNetCore.CAP.Internal +{ + /// + /// A factory for instances. + /// + public class ModelBinderFactory : IModelBinderFactory + { + private readonly ConcurrentDictionary _cache = + new ConcurrentDictionary(); + + public IModelBinder CreateBinder(ParameterInfo parameter) + { + if (parameter == null) + { + throw new ArgumentNullException(nameof(parameter)); + } + + object token = parameter; + + var binder = CreateBinderCoreCached(parameter, token); + if (binder == null) + { + throw new InvalidOperationException("Format Could Not Create IModelBinder"); + } + + return binder; + } + + private IModelBinder CreateBinderCoreCached(ParameterInfo parameterInfo, object token) + { + IModelBinder binder; + if (TryGetCachedBinder(parameterInfo, token, out binder)) + { + return binder; + } + if (!Helper.IsComplexType(parameterInfo.ParameterType)) + { + binder = new SimpleTypeModelBinder(parameterInfo); + } + else + { + binder = new ComplexTypeModelBinder(parameterInfo); + } + + AddToCache(parameterInfo, token, binder); + + return binder; + } + + private void AddToCache(ParameterInfo info, object cacheToken, IModelBinder binder) + { + if (cacheToken == null) + { + return; + } + + _cache.TryAdd(new Key(info, cacheToken), binder); + } + + private bool TryGetCachedBinder(ParameterInfo info, object cacheToken, out IModelBinder binder) + { + if (cacheToken == null) + { + binder = null; + return false; + } + + return _cache.TryGetValue(new Key(info, cacheToken), out binder); + } + + private struct Key : IEquatable + { + private readonly ParameterInfo _metadata; + private readonly object _token; + + public Key(ParameterInfo metadata, object token) + { + _metadata = metadata; + _token = token; + } + + public bool Equals(Key other) + { + return _metadata.Equals(other._metadata) && object.ReferenceEquals(_token, other._token); + } + + public override bool Equals(object obj) + { + var other = obj as Key?; + return other.HasValue && Equals(other.Value); + } + + public override int GetHashCode() + { + var hash = new HashCodeCombiner(); + hash.Add(_metadata); + hash.Add(RuntimeHelpers.GetHashCode(_token)); + return hash; + } + + public override string ToString() + { + return $"{_token} (Property: '{_metadata.Name}' Type: '{_metadata.ParameterType.Name}')"; + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs index c921247..184a386 100644 --- a/src/DotNetCore.CAP/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/LoggerExtensions.cs @@ -14,6 +14,7 @@ namespace DotNetCore.CAP private static readonly Action _enqueuingReceivdeMessage; private static readonly Action _executingConsumerMethod; private static readonly Action _receivedMessageRetryExecuting; + private static readonly Action _modelBinderFormattingException; private static Action _jobFailed; private static Action _jobFailedWillRetry; @@ -46,12 +47,12 @@ namespace DotNetCore.CAP _enqueuingSentMessage = LoggerMessage.Define( LogLevel.Debug, 2, - "Enqueuing a topic to the sent message store. NameKey: {NameKey}. Content: {Content}"); + "Enqueuing a topic to the sent message store. NameKey: '{NameKey}' Content: '{Content}'."); _enqueuingReceivdeMessage = LoggerMessage.Define( LogLevel.Debug, 2, - "Enqueuing a topic to the received message store. NameKey: {NameKey}. Content: {Content}"); + "Enqueuing a topic to the received message store. NameKey: '{NameKey}. Content: '{Content}'."); _executingConsumerMethod = LoggerMessage.Define( LogLevel.Error, @@ -63,6 +64,12 @@ namespace DotNetCore.CAP 5, "Received message topic method '{topicName}' failed to execute."); + _modelBinderFormattingException = LoggerMessage.Define( + LogLevel.Error, + 5, + "When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'." + ); + _jobRetrying = LoggerMessage.Define( LogLevel.Debug, 3, @@ -154,5 +161,10 @@ namespace DotNetCore.CAP { _exceptionOccuredWhileExecutingJob(logger, jobId, ex); } + + public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName, string content, Exception ex) + { + _modelBinderFormattingException(logger, methodName, parameterName, content, ex); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs index 24dfbcc..c23f24d 100644 --- a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs @@ -1,5 +1,4 @@ using System; -using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Models { diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs index bfb0103..069e4d0 100644 --- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs @@ -1,5 +1,4 @@ using System; -using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Models { diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 203b564..bbf04fd 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -49,7 +49,7 @@ namespace DotNetCore.CAP.Processor try { var worked = await Step(context); - + context.ThrowIfStopping(); Waiting = true; diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index d8328bc..0d9a808 100644 --- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -39,8 +39,9 @@ namespace DotNetCore.CAP.Processor public void Start() { - var processorCount = Environment.ProcessorCount; + var processorCount = _options.QueueProcessorCount; _processors = GetProcessors(processorCount); + _logger.ServerStarting(processorCount, _processors.Length); _context = new ProcessingContext(_provider, _cts.Token); @@ -61,7 +62,7 @@ namespace DotNetCore.CAP.Processor _logger.LogTrace("Pulsing the Queuer."); - PublishQueuer.PulseEvent.Set(); + PublishQueuer.PulseEvent.Set(); } public void Dispose() @@ -76,7 +77,7 @@ namespace DotNetCore.CAP.Processor _cts.Cancel(); try { - _compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds); + _compositeTask.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds); } catch (AggregateException ex) { diff --git a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs index 9d5f12d..8595338 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs @@ -1,7 +1,4 @@ using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; using System.Threading.Tasks; using DotNetCore.CAP.Processor.States; using Microsoft.Extensions.DependencyInjection; @@ -30,7 +27,7 @@ namespace DotNetCore.CAP.Processor _logger = logger; _provider = provider; _stateChanger = stateChanger; - _waitingInterval = _options.FailedMessageWaitingInterval; + _waitingInterval = TimeSpan.FromSeconds(_options.FailedMessageWaitingInterval); } public async Task ProcessAsync(ProcessingContext context) @@ -56,14 +53,32 @@ namespace DotNetCore.CAP.Processor private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context) { var messages = await connection.GetFailedPublishedMessages(); + var hasException = false; + foreach (var message in messages) { + if (!hasException) + { + try + { + _options.FailedCallback?.Invoke(Models.MessageType.Publish, message.Name, message.Content); + + } + catch (Exception ex) + { + hasException = true; + _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); + } + } + using (var transaction = connection.CreateTransaction()) { _stateChanger.ChangeState(message, new EnqueuedState(), transaction); await transaction.CommitAsync(); } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); } } @@ -71,16 +86,34 @@ namespace DotNetCore.CAP.Processor private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context) { var messages = await connection.GetFailedReceviedMessages(); + var hasException = false; + foreach (var message in messages) { + if (!hasException) + { + try + { + _options.FailedCallback?.Invoke(Models.MessageType.Subscribe, message.Name, message.Content); + + } + catch (Exception ex) + { + hasException = true; + _logger.LogWarning("Failed call-back method raised an exception:" + ex.Message); + } + } + using (var transaction = connection.CreateTransaction()) { _stateChanger.ChangeState(message, new EnqueuedState(), transaction); await transaction.CommitAsync(); } + context.ThrowIfStopping(); + await context.WaitAsync(_delay); } } } -} +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/QueueExecutorFactory.cs b/src/DotNetCore.CAP/QueueExecutorFactory.cs index d76f09f..ed28fe8 100644 --- a/src/DotNetCore.CAP/QueueExecutorFactory.cs +++ b/src/DotNetCore.CAP/QueueExecutorFactory.cs @@ -18,8 +18,8 @@ namespace DotNetCore.CAP { var queueExectors = _serviceProvider.GetServices(); - return messageType == MessageType.Publish - ? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor) + return messageType == MessageType.Publish + ? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor) : queueExectors.FirstOrDefault(x => !(x is BasePublishQueueExecutor)); } } diff --git a/test/DotNetCore.CAP.MySql.Test/ConnectionUtil.cs b/test/DotNetCore.CAP.MySql.Test/ConnectionUtil.cs new file mode 100644 index 0000000..eafee54 --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/ConnectionUtil.cs @@ -0,0 +1,47 @@ +using System; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql.Test +{ + public static class ConnectionUtil + { + private const string DatabaseVariable = "Cap_MySql_DatabaseName"; + private const string ConnectionStringTemplateVariable = "Cap_MySql_ConnectionStringTemplate"; + + private const string MasterDatabaseName = "information_schema"; + private const string DefaultDatabaseName = @"DotNetCore.CAP.MySql.Test"; + + private const string DefaultConnectionStringTemplate = + @"Server=localhost;Database={0};Uid=root;Pwd=123123;"; + + public static string GetDatabaseName() + { + return Environment.GetEnvironmentVariable(DatabaseVariable) ?? DefaultDatabaseName; + } + + public static string GetMasterConnectionString() + { + return string.Format(GetConnectionStringTemplate(), MasterDatabaseName); + } + + public static string GetConnectionString() + { + return string.Format(GetConnectionStringTemplate(), GetDatabaseName()); + } + + private static string GetConnectionStringTemplate() + { + return + Environment.GetEnvironmentVariable(ConnectionStringTemplateVariable) ?? + DefaultConnectionStringTemplate; + } + + public static MySqlConnection CreateConnection(string connectionString = null) + { + connectionString = connectionString ?? GetConnectionString(); + var connection = new MySqlConnection(connectionString); + connection.Open(); + return connection; + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs b/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs new file mode 100644 index 0000000..bd858e5 --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs @@ -0,0 +1,68 @@ +using System.Data; +using System.Threading; +using Dapper; +using Microsoft.EntityFrameworkCore; + +namespace DotNetCore.CAP.MySql.Test +{ + public abstract class DatabaseTestHost : TestHost + { + private static bool _sqlObjectInstalled; + public static object _lock = new object(); + + protected override void PostBuildServices() + { + base.PostBuildServices(); + lock (_lock) + { + if (!_sqlObjectInstalled) + { + InitializeDatabase(); + } + } + } + + public override void Dispose() + { + DeleteAllData(); + base.Dispose(); + } + + private void InitializeDatabase() + { + using (CreateScope()) + { + var storage = GetService(); + var token = new CancellationTokenSource().Token; + CreateDatabase(); + storage.InitializeAsync(token).GetAwaiter().GetResult(); + _sqlObjectInstalled = true; + } + } + + private void CreateDatabase() + { + var masterConn = ConnectionUtil.GetMasterConnectionString(); + var databaseName = ConnectionUtil.GetDatabaseName(); + using (var connection = ConnectionUtil.CreateConnection(masterConn)) + { + connection.Execute($@" +DROP DATABASE IF EXISTS `{databaseName}`; +CREATE DATABASE `{databaseName}`;"); + } + } + + private void DeleteAllData() + { + var conn = ConnectionUtil.GetConnectionString(); + + using (var connection = ConnectionUtil.CreateConnection(conn)) + { + connection.Execute($@" +TRUNCATE TABLE `cap.published`; +TRUNCATE TABLE `cap.received`; +TRUNCATE TABLE `cap.queue`;"); + } + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj new file mode 100644 index 0000000..cf89b39 --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj @@ -0,0 +1,46 @@ + + + + netcoreapp1.1 + true + DotNetCore.CAP.MySql.Test + DotNetCore.CAP.MySql.Test + true + $(PackageTargetFallback);dnxcore50;portable-net451+win8 + 1.1.1 + false + false + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs new file mode 100644 index 0000000..b83588e --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs @@ -0,0 +1,134 @@ +using System; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using Xunit; + +namespace DotNetCore.CAP.MySql.Test +{ + [Collection("MySql")] + public class MySqlStorageConnectionTest : DatabaseTestHost + { + private MySqlStorageConnection _storage; + + public MySqlStorageConnectionTest() + { + var options = GetService(); + _storage = new MySqlStorageConnection(options); + } + + [Fact] + public async Task GetPublishedMessageAsync_Test() + { + var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; + var publishMessage = new CapPublishedMessage + { + Name = "MySqlStorageConnectionTest", + Content = "", + StatusName = StatusName.Scheduled + }; + var insertedId = default(int); + using (var connection = ConnectionUtil.CreateConnection()) + { + insertedId = connection.QueryFirst(sql, publishMessage); + } + var message = await _storage.GetPublishedMessageAsync(insertedId); + Assert.NotNull(message); + Assert.Equal("MySqlStorageConnectionTest", message.Name); + Assert.Equal(StatusName.Scheduled, message.StatusName); + } + + [Fact] + public async Task FetchNextMessageAsync_Test() + { + var sql = "INSERT INTO `Cap.Queue`(`MessageId`,`MessageType`) VALUES(@MessageId,@MessageType);"; + var queue = new CapQueue + { + MessageId = 3333, + MessageType = MessageType.Publish + }; + using (var connection = ConnectionUtil.CreateConnection()) + { + connection.Execute(sql, queue); + } + var fetchedMessage = await _storage.FetchNextMessageAsync(); + fetchedMessage.Dispose(); + Assert.NotNull(fetchedMessage); + Assert.Equal(MessageType.Publish, fetchedMessage.MessageType); + Assert.Equal(3333, fetchedMessage.MessageId); + } + + [Fact] + public async Task StoreReceivedMessageAsync_Test() + { + var receivedMessage = new CapReceivedMessage + { + Name = "MySqlStorageConnectionTest", + Content = "", + Group = "mygroup", + StatusName = StatusName.Scheduled + }; + + Exception exception = null; + try + { + await _storage.StoreReceivedMessageAsync(receivedMessage); + } + catch (Exception ex) + { + exception = ex; + } + Assert.Null(exception); + } + + [Fact] + public async Task GetReceivedMessageAsync_Test() + { + + var sql = $@" + INSERT INTO `cap.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) + VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; + var receivedMessage = new CapReceivedMessage + { + Name = "MySqlStorageConnectionTest", + Content = "", + Group = "mygroup", + StatusName = StatusName.Scheduled + }; + var insertedId = default(int); + using (var connection = ConnectionUtil.CreateConnection()) + { + insertedId = connection.QueryFirst(sql, receivedMessage); + } + + var message = await _storage.GetReceivedMessageAsync(insertedId); + + Assert.NotNull(message); + Assert.Equal(StatusName.Scheduled, message.StatusName); + Assert.Equal("MySqlStorageConnectionTest", message.Name); + Assert.Equal("mygroup", message.Group); + } + + [Fact] + public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test() + { + var receivedMessage = new CapReceivedMessage + { + Name = "MySqlStorageConnectionTest", + Content = "", + Group = "mygroup", + StatusName = StatusName.Scheduled + }; + await _storage.StoreReceivedMessageAsync(receivedMessage); + + var message = await _storage.GetNextReceviedMessageToBeEnqueuedAsync(); + + Assert.NotNull(message); + Assert.Equal(StatusName.Scheduled, message.StatusName); + Assert.Equal("MySqlStorageConnectionTest", message.Name); + Assert.Equal("mygroup", message.Group); + } + + } +} diff --git a/test/DotNetCore.CAP.MySql.Test/MySqlStorageTest.cs b/test/DotNetCore.CAP.MySql.Test/MySqlStorageTest.cs new file mode 100644 index 0000000..9286929 --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/MySqlStorageTest.cs @@ -0,0 +1,71 @@ +using Xunit; +using Dapper; + +namespace DotNetCore.CAP.MySql.Test +{ + [Collection("MySql")] + public class MySqlStorageTest : DatabaseTestHost + { + private readonly string _dbName; + private readonly string _masterDbConnectionString; + + + public MySqlStorageTest() + { + _dbName = ConnectionUtil.GetDatabaseName(); + _masterDbConnectionString = ConnectionUtil.GetMasterConnectionString(); + } + + [Fact] + public void Database_IsExists() + { + using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) + { + var databaseName = ConnectionUtil.GetDatabaseName(); + var sql = $@"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME = '{databaseName}'"; + var result = connection.QueryFirstOrDefault(sql); + Assert.NotNull(result); + Assert.True(databaseName.Equals(result, System.StringComparison.CurrentCultureIgnoreCase)); + } + } + + [Fact] + public void DatabaseTable_Published_IsExists() + { + var tableName = "cap.published"; + using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) + { + var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; + var result = connection.QueryFirstOrDefault(sql); + Assert.NotNull(result); + Assert.Equal(tableName, result); + } + } + + [Fact] + public void DatabaseTable_Queue_IsExists() + { + var tableName = "cap.queue"; + using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) + { + var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; + var result = connection.QueryFirstOrDefault(sql); + Assert.NotNull(result); + Assert.Equal(tableName, result); + } + } + + [Fact] + public void DatabaseTable_Received_IsExists() + { + var tableName = "cap.received"; + using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) + { + var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; + var result = connection.QueryFirstOrDefault(sql); + Assert.NotNull(result); + Assert.Equal(tableName, result); + } + } + } +} diff --git a/test/DotNetCore.CAP.MySql.Test/TestHost.cs b/test/DotNetCore.CAP.MySql.Test/TestHost.cs new file mode 100644 index 0000000..c8290ad --- /dev/null +++ b/test/DotNetCore.CAP.MySql.Test/TestHost.cs @@ -0,0 +1,98 @@ +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP.MySql.Test +{ + public abstract class TestHost : IDisposable + { + protected IServiceCollection _services; + protected string _connectionString; + private IServiceProvider _provider; + private IServiceProvider _scopedProvider; + + public TestHost() + { + CreateServiceCollection(); + PreBuildServices(); + BuildServices(); + PostBuildServices(); + } + + protected IServiceProvider Provider => _scopedProvider ?? _provider; + + private void CreateServiceCollection() + { + var services = new ServiceCollection(); + + services.AddOptions(); + services.AddLogging(); + + _connectionString = ConnectionUtil.GetConnectionString(); + services.AddSingleton(new MySqlOptions { ConnectionString = _connectionString }); + services.AddSingleton(); + + _services = services; + } + + protected virtual void PreBuildServices() + { + } + + private void BuildServices() + { + _provider = _services.BuildServiceProvider(); + } + + protected virtual void PostBuildServices() + { + } + + public IDisposable CreateScope() + { + var scope = CreateScope(_provider); + var loc = scope.ServiceProvider; + _scopedProvider = loc; + return new DelegateDisposable(() => + { + if (_scopedProvider == loc) + { + _scopedProvider = null; + } + scope.Dispose(); + }); + } + + public IServiceScope CreateScope(IServiceProvider provider) + { + var scope = provider.GetService().CreateScope(); + return scope; + } + + public T GetService() => Provider.GetService(); + + public T Ensure(ref T service) + where T : class + => service ?? (service = GetService()); + + public virtual void Dispose() + { + (_provider as IDisposable)?.Dispose(); + } + + private class DelegateDisposable : IDisposable + { + private Action _dispose; + + public DelegateDisposable(Action dispose) + { + _dispose = dispose; + } + + public void Dispose() + { + _dispose(); + } + } + } +} \ No newline at end of file diff --git a/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs b/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs index b32fd74..d759384 100644 --- a/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs +++ b/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs @@ -35,7 +35,7 @@ namespace DotNetCore.CAP.SqlServer.Test var storage = GetService(); var token = new CancellationTokenSource().Token; CreateDatabase(); - storage.InitializeAsync(token).Wait(); + storage.InitializeAsync(token).GetAwaiter().GetResult(); _sqlObjectInstalled = true; } }