Quellcode durchsuchen

Merge pull request #20 from dotnetcore/develop

Release 1.1.0
master
Savorboard vor 7 Jahren
committed by GitHub
Ursprung
Commit
9aaf1a6e33
91 geänderte Dateien mit 2318 neuen und 572 gelöschten Zeilen
  1. +1
    -0
      .gitignore
  2. +29
    -8
      CAP.sln
  3. +2
    -0
      appveyor.yml
  4. +2
    -2
      build/common.props
  5. +2
    -2
      build/version.props
  6. +13
    -0
      samples/Sample.Kafka.SqlServer/AppDbContext.cs
  7. +47
    -0
      samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs
  8. +9
    -1
      samples/Sample.Kafka.SqlServer/Program.cs
  9. +3
    -7
      samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
  10. +33
    -0
      samples/Sample.Kafka.SqlServer/Startup.cs
  11. +0
    -13
      samples/Sample.Kafka/AppDbContext.cs
  12. +0
    -54
      samples/Sample.Kafka/Controllers/ValuesController.cs
  13. +0
    -29
      samples/Sample.Kafka/Properties/launchSettings.json
  14. +0
    -50
      samples/Sample.Kafka/Startup.cs
  15. +0
    -10
      samples/Sample.Kafka/appsettings.Development.json
  16. +0
    -8
      samples/Sample.Kafka/appsettings.json
  17. +16
    -0
      samples/Sample.RabbitMQ.MySql/AppDbContext.cs
  18. +50
    -0
      samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
  19. +32
    -0
      samples/Sample.RabbitMQ.MySql/Program.cs
  20. +28
    -0
      samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
  21. +37
    -0
      samples/Sample.RabbitMQ.MySql/Startup.cs
  22. +3
    -5
      src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
  23. +12
    -8
      src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs
  24. +9
    -0
      src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs
  25. +3
    -0
      src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs
  26. +1
    -1
      src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
  27. +21
    -12
      src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
  28. +5
    -4
      src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
  29. +22
    -18
      src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs
  30. +13
    -0
      src/DotNetCore.CAP.MySql/CAP.EFOptions.cs
  31. +44
    -0
      src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
  32. +13
    -0
      src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
  33. +49
    -0
      src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs
  34. +203
    -0
      src/DotNetCore.CAP.MySql/CapPublisher.cs
  35. +27
    -0
      src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
  36. +11
    -0
      src/DotNetCore.CAP.MySql/FetchedMessage.cs
  37. +61
    -0
      src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs
  38. +74
    -0
      src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
  39. +66
    -0
      src/DotNetCore.CAP.MySql/MySqlStorage.cs
  40. +152
    -0
      src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
  41. +71
    -0
      src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs
  42. +13
    -6
      src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
  43. +4
    -7
      src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
  44. +3
    -0
      src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs
  45. +1
    -1
      src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
  46. +4
    -4
      src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
  47. +35
    -16
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
  48. +3
    -3
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
  49. +1
    -1
      src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs
  50. +4
    -5
      src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
  51. +39
    -52
      src/DotNetCore.CAP.SqlServer/CapPublisher.cs
  52. +1
    -1
      src/DotNetCore.CAP.SqlServer/FetchedMessage.cs
  53. +2
    -3
      src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
  54. +0
    -1
      src/DotNetCore.CAP/Abstractions/ConsumerContext.cs
  55. +1
    -10
      src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs
  56. +0
    -55
      src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingContext.cs
  57. +105
    -0
      src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingResult.cs
  58. +1
    -1
      src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs
  59. +22
    -18
      src/DotNetCore.CAP/CAP.Options.cs
  60. +2
    -3
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  61. +1
    -1
      src/DotNetCore.CAP/IBootstrapper.Default.cs
  62. +0
    -40
      src/DotNetCore.CAP/ICapPublisher.cs
  63. +5
    -5
      src/DotNetCore.CAP/IConsumerClient.cs
  64. +9
    -6
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  65. +0
    -1
      src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs
  66. +1
    -0
      src/DotNetCore.CAP/IStorageConnection.cs
  67. +6
    -0
      src/DotNetCore.CAP/Infrastructure/Helper.cs
  68. +1
    -3
      src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs
  69. +4
    -6
      src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
  70. +81
    -0
      src/DotNetCore.CAP/Internal/HashCodeCombiner.cs
  71. +22
    -14
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
  72. +17
    -14
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
  73. +32
    -0
      src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs
  74. +0
    -47
      src/DotNetCore.CAP/Internal/IModelBinder.Default.cs
  75. +86
    -0
      src/DotNetCore.CAP/Internal/IModelBinder.SimpleType.cs
  76. +10
    -0
      src/DotNetCore.CAP/Internal/IModelBinderFactory.cs
  77. +114
    -0
      src/DotNetCore.CAP/Internal/ModelBinderFactory.cs
  78. +14
    -2
      src/DotNetCore.CAP/LoggerExtensions.cs
  79. +0
    -1
      src/DotNetCore.CAP/Models/CapPublishedMessage.cs
  80. +0
    -1
      src/DotNetCore.CAP/Models/CapReceivedMessage.cs
  81. +1
    -1
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  82. +4
    -3
      src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
  83. +38
    -5
      src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs
  84. +2
    -2
      src/DotNetCore.CAP/QueueExecutorFactory.cs
  85. +47
    -0
      test/DotNetCore.CAP.MySql.Test/ConnectionUtil.cs
  86. +68
    -0
      test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs
  87. +46
    -0
      test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
  88. +134
    -0
      test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs
  89. +71
    -0
      test/DotNetCore.CAP.MySql.Test/MySqlStorageTest.cs
  90. +98
    -0
      test/DotNetCore.CAP.MySql.Test/TestHost.cs
  91. +1
    -1
      test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs

+ 1
- 0
.gitignore Datei anzeigen

@@ -33,3 +33,4 @@ bin/
/.idea/.idea.CAP
/.idea/.idea.CAP
/.idea
Properties

+ 29
- 8
CAP.sln Datei anzeigen

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.15
VisualStudioVersion = 15.0.26430.16
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
@@ -35,8 +35,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP", "src\DotNe
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{3A6B6931-A123-477A-9469-8B468B5385AF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka", "samples\Sample.Kafka\Sample.Kafka.csproj", "{2F095ED9-5BC9-4512-9013-A47685FB2508}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Kafka", "src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj", "{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RabbitMQ", "src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj", "{9961B80E-0718-4280-B2A0-271B003DE26B}"
@@ -59,6 +57,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.SqlServer.Test", "test\DotNetCore.CAP.SqlServer.Test\DotNetCore.CAP.SqlServer.Test.csproj", "{DA00FA38-C4B9-4F55-8756-D480FBC1084F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql", "src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj", "{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MySql.Test", "test\DotNetCore.CAP.MySql.Test\DotNetCore.CAP.MySql.Test.csproj", "{80A84F62-1558-427B-BA74-B47AA8A665B5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "samples\Sample.RabbitMQ.MySql\Sample.RabbitMQ.MySql.csproj", "{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -69,10 +75,6 @@ Global
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66}.Release|Any CPU.Build.0 = Release|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F095ED9-5BC9-4512-9013-A47685FB2508}.Release|Any CPU.Build.0 = Release|Any CPU
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -92,6 +94,22 @@ Global
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DA00FA38-C4B9-4F55-8756-D480FBC1084F}.Release|Any CPU.Build.0 = Release|Any CPU
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B}.Release|Any CPU.Build.0 = Release|Any CPU
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80A84F62-1558-427B-BA74-B47AA8A665B5}.Release|Any CPU.Build.0 = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873}.Release|Any CPU.Build.0 = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -99,11 +117,14 @@ Global
GlobalSection(NestedProjects) = preSolution
{9E5A7F49-8E31-4A71-90CC-1DA9AEDA99EE} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{E8AF8611-0EA4-4B19-BC48-87C57A87DC66} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{2F095ED9-5BC9-4512-9013-A47685FB2508} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{C42CDE33-0878-4BA0-96F2-4CB7C8FDEAAD} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{9961B80E-0718-4280-B2A0-271B003DE26B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{F608B509-A99B-4AC7-8227-42051DD4A578} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{3B577468-6792-4EF1-9237-15180B176A24} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{DA00FA38-C4B9-4F55-8756-D480FBC1084F} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{FA15685A-778A-4D2A-A2FE-27FAD2FFA65B} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
EndGlobal

+ 2
- 0
appveyor.yml Datei anzeigen

@@ -4,8 +4,10 @@ environment:
BUILDING_ON_PLATFORM: win
BuildEnvironment: appveyor
Cap_SqlServer_ConnectionStringTemplate: Server=(local)\SQL2014;Database={0};User ID=sa;Password=Password12!
Cap_MySql_ConnectionStringTemplate: Server=localhost;Database={0};Uid=root;Pwd=Password12!
services:
- mssql2014
- mysql
build_script:
- ps: ./ConfigureMSDTC.ps1
- ps: ./build.ps1


+ 2
- 2
build/common.props Datei anzeigen

@@ -14,8 +14,8 @@
<PackageIconUrl>https://avatars2.githubusercontent.com/u/19404084</PackageIconUrl>
<PackageProjectUrl>https://github.com/dotnetcore/CAP</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/dotnetcore/CAP/blob/master/LICENSE.txt</PackageLicenseUrl>
<PackageTags>aspnetcore;cap;consistency</PackageTags>
<Description>Eventually consistency in distributed architectures.</Description>
<PackageTags>eventbus;rabbitmq;kafka;cap;transaction;</PackageTags>
<Description>EventBus and eventually consistency in distributed architectures.</Description>
</PropertyGroup>

</Project>

+ 2
- 2
build/version.props Datei anzeigen

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionMinor>1</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 13
- 0
samples/Sample.Kafka.SqlServer/AppDbContext.cs Datei anzeigen

@@ -0,0 +1,13 @@
using Microsoft.EntityFrameworkCore;

namespace Sample.Kafka
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
//optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Sample.Kafka.SqlServer;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}

+ 47
- 0
samples/Sample.Kafka.SqlServer/Controllers/ValuesController.cs Datei anzeigen

@@ -0,0 +1,47 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Kafka.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;
private readonly AppDbContext _dbContext;

public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_capBus = producer;
_dbContext = dbContext;
}

[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.rabbitmq.mysql", "");
return Ok();
}

[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
trans.Commit();
}
return Ok();
}

[NonAction]
[CapSubscribe("sample.kafka.sqlserver", Group = "test")]
public void KafkaTest()
{
Console.WriteLine("[sample.kafka.sqlserver] message received");
Debug.WriteLine("[sample.kafka.sqlserver] message received");
}
}
}

samples/Sample.Kafka/Program.cs → samples/Sample.Kafka.SqlServer/Program.cs Datei anzeigen

@@ -1,13 +1,21 @@
using System.IO;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;

namespace Sample.Kafka
{
public class Program
{
public static void Main(string[] args) {
public static void Main(string[] args)
{
var config = new ConfigurationBuilder()
.AddCommandLine(args)
.AddEnvironmentVariables("ASPNETCORE_")
.Build();

var host = new WebHostBuilder()
.UseConfiguration(config)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()

samples/Sample.Kafka/Sample.Kafka.csproj → samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj Datei anzeigen

@@ -2,14 +2,9 @@

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<AssemblyName>Sample.Kafka.SqlServer</AssemblyName>
</PropertyGroup>

<ItemGroup>
<Compile Remove="wwwroot\**" />
<Content Remove="wwwroot\**" />
<EmbeddedResource Remove="wwwroot\**" />
<None Remove="wwwroot\**" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
@@ -17,6 +12,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
</ItemGroup>
<ItemGroup>

+ 33
- 0
samples/Sample.Kafka.SqlServer/Startup.cs Datei anzeigen

@@ -0,0 +1,33 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.Kafka
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();

services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
});

services.AddMvc();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();

app.UseMvc();

app.UseCap();
}
}
}

+ 0
- 13
samples/Sample.Kafka/AppDbContext.cs Datei anzeigen

@@ -1,13 +0,0 @@
using Microsoft.EntityFrameworkCore;

namespace Sample.Kafka
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True");
//optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
}
}
}

+ 0
- 54
samples/Sample.Kafka/Controllers/ValuesController.cs Datei anzeigen

@@ -1,54 +0,0 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Kafka.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _producer;
private readonly AppDbContext _dbContext ;

public ValuesController(ICapPublisher producer, AppDbContext dbContext)
{
_producer = producer;
_dbContext = dbContext;
}

[Route("/")]
public IActionResult Index()
{
return Ok();
}
public string ServerPath => ((IHostingEnvironment)HttpContext.RequestServices.GetService(typeof(IHostingEnvironment))).ContentRootPath;

[CapSubscribe("zzwl.topic.finace.callBack", Group = "test")]
public void KafkaTest(Person person)
{
Console.WriteLine(DateTime.Now);
}

[Route("~/send")]
public async Task<IActionResult> SendTopic()
{
using (var trans = _dbContext.Database.BeginTransaction())
{
await _producer.PublishAsync("zzwl.topic.finace.callBack","");

trans.Commit();
}

return Ok();
}

public class Person
{
public string Name { get; set; }

public int Age { get; set; }
}
}
}

+ 0
- 29
samples/Sample.Kafka/Properties/launchSettings.json Datei anzeigen

@@ -1,29 +0,0 @@
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:49909/",
"sslPort": 0
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "api/values",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Sample.Kafka": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "api/values",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "http://localhost:49910"
}
}
}

+ 0
- 50
samples/Sample.Kafka/Startup.cs Datei anzeigen

@@ -1,50 +0,0 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.Kafka
{
public class Startup
{
public Startup(IHostingEnvironment env)
{
var builder = new ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
.AddEnvironmentVariables();
Configuration = builder.Build();
}

public IConfigurationRoot Configuration { get; }

// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();

services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
//x.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True");
x.UseKafka("localhost:9092");
});

// Add framework services.
services.AddMvc();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(Configuration.GetSection("Logging"));
loggerFactory.AddDebug();

app.UseMvc();

app.UseCap();
}
}
}

+ 0
- 10
samples/Sample.Kafka/appsettings.Development.json Datei anzeigen

@@ -1,10 +0,0 @@
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Microsoft": "Information"
}
}
}

+ 0
- 8
samples/Sample.Kafka/appsettings.json Datei anzeigen

@@ -1,8 +0,0 @@
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Warning"
}
}
}

+ 16
- 0
samples/Sample.RabbitMQ.MySql/AppDbContext.cs Datei anzeigen

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;

namespace Sample.RabbitMQ.MySql
{
public class AppDbContext : DbContext
{
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql("Server=localhost;Database=Sample.RabbitMQ.MySql;Uid=root;Pwd=123123;");
}
}
}

+ 50
- 0
samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs Datei anzeigen

@@ -0,0 +1,50 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace Sample.RabbitMQ.MySql.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller
{
private readonly AppDbContext _dbContext;
private readonly ICapPublisher _capBus;

public ValuesController(AppDbContext dbContext, ICapPublisher capPublisher)
{
_dbContext = dbContext;
_capBus = capPublisher;
}

[Route("~/publish")]
public IActionResult PublishMessage()
{
_capBus.Publish("sample.kafka.sqlserver", "");

return Ok();
}

[Route("~/publishWithTrans")]
public async Task<IActionResult> PublishMessageWithTransaction()
{
using (var trans = await _dbContext.Database.BeginTransactionAsync())
{
await _capBus.PublishAsync("sample.kafka.sqlserver", "");
trans.Commit();
}
return Ok();
}

[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void ReceiveMessage()
{
Console.WriteLine("[sample.rabbitmq.mysql] message received");
Debug.WriteLine("[sample.rabbitmq.mysql] message received");
}
}
}

+ 32
- 0
samples/Sample.RabbitMQ.MySql/Program.cs Datei anzeigen

@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;

namespace Sample.RabbitMQ.MySql
{
public class Program
{
public static void Main(string[] args)
{
var config = new ConfigurationBuilder()
.AddCommandLine(args)
.AddEnvironmentVariables("ASPNETCORE_")
.Build();

var host = new WebHostBuilder()
.UseConfiguration(config)
.UseKestrel()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseIISIntegration()
.UseStartup<Startup>()
.Build();

host.Run();
}
}
}

+ 28
- 0
samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj Datei anzeigen

@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="1.1.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="1.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="1.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="1.1.2" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql.Design" Version="1.1.2" />
</ItemGroup>
<ItemGroup>
<DotNetCliToolReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Tools" Version="1.0.1" />
<DotNetCliToolReference Include="Microsoft.EntityFrameworkCore.Tools.DotNet" Version="1.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 37
- 0
samples/Sample.RabbitMQ.MySql/Startup.cs Datei anzeigen

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Sample.RabbitMQ.MySql
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<AppDbContext>();

services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseKafka("localhost:9092");
});

services.AddMvc();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole();
loggerFactory.AddDebug();

app.UseMvc();

app.UseCap();
}
}
}

+ 3
- 5
src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs Datei anzeigen

@@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class KafkaCapOptionsExtension : ICapOptionsExtension
internal sealed class KafkaCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<KafkaOptions> _configure;

@@ -16,12 +16,10 @@ namespace DotNetCore.CAP

public void AddServices(IServiceCollection services)
{
services.Configure(_configure);

var kafkaOptions = new KafkaOptions();
_configure(kafkaOptions);
_configure?.Invoke(kafkaOptions);
services.AddSingleton(kafkaOptions);
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();
}


+ 12
- 8
src/DotNetCore.CAP.Kafka/CAP.KafkaOptions.cs Datei anzeigen

@@ -21,30 +21,34 @@ namespace DotNetCore.CAP
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para>
/// </summary>
public IDictionary<string, object> MainConfig { get; private set; }
public readonly IDictionary<string, object> MainConfig;

/// <summary>
/// The `bootstrap.servers` item config of `MainConfig`.
/// The `bootstrap.servers` item config of <see cref="MainConfig"/>.
/// <para>
/// Initial list of brokers as a CSV list of broker host or host:port.
/// </para>
/// </summary>
public string Servers { get; set; }

internal IEnumerable<KeyValuePair<string, object>> AsRdkafkaConfig()
internal IEnumerable<KeyValuePair<string, object>> AskafkaConfig()
{
if (MainConfig.ContainsKey("bootstrap.servers"))
{
return MainConfig.AsEnumerable();
}

if (string.IsNullOrEmpty(Servers))
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}
else
{
MainConfig.Add("bootstrap.servers", Servers);
}
MainConfig.Add("bootstrap.servers", Servers);

MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["socket.blocking.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";

return MainConfig.AsEnumerable();
}
}

+ 9
- 0
src/DotNetCore.CAP.Kafka/CAP.Options.Extensions.cs Datei anzeigen

@@ -6,6 +6,10 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
/// <summary>
/// Configuration to use kafka in CAP.
/// </summary>
/// <param name="bootstrapServers">Kafka bootstrap server urls.</param>
public static CapOptions UseKafka(this CapOptions options, string bootstrapServers)
{
return options.UseKafka(opt =>
@@ -14,6 +18,11 @@ namespace Microsoft.Extensions.DependencyInjection
});
}

/// <summary>
/// Configuration to use kafka in CAP.
/// </summary>
/// <param name="configure">Provides programmatic configuration for the kafka .</param>
/// <returns></returns>
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));


+ 3
- 0
src/DotNetCore.CAP.Kafka/CAP.SubscribeAttribute.cs Datei anzeigen

@@ -3,6 +3,9 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// An attribute for subscribe Kafka messages.
/// </summary>
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string name)


+ 1
- 1
src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj Datei anzeigen

@@ -14,7 +14,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="0.9.5" />
<PackageReference Include="Confluent.Kafka" Version="0.11.0" />
</ItemGroup>

<ItemGroup>


+ 21
- 12
src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs Datei anzeigen

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Confluent.Kafka;
@@ -6,36 +7,37 @@ using Confluent.Kafka.Serialization;

namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClient : IConsumerClient
internal sealed class KafkaConsumerClient : IConsumerClient
{
private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
private Consumer<Null, string> _consumerClient;

public event EventHandler<MessageContext> MessageReceieved;
public event EventHandler<MessageContext> OnMessageReceieved;

public event EventHandler<string> OnError;

public IDeserializer<string> StringDeserializer { get; set; }

public KafkaConsumerClient(string groupId, KafkaOptions options)
{
_groupId = groupId;
_kafkaOptions = options;
_kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
StringDeserializer = new StringDeserializer(Encoding.UTF8);
}

public void Subscribe(string topic)
public void Subscribe(IEnumerable<string> topics)
{
Subscribe(topic, 0);
}
if (topics == null)
throw new ArgumentNullException(nameof(topics));

public void Subscribe(string topicName, int partition)
{
if (_consumerClient == null)
{
InitKafkaClient();
}
_consumerClient.Assignment.Add(new TopicPartition(topicName, partition));
_consumerClient.Subscribe(topicName);

//_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
_consumerClient.Subscribe(topics);
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
@@ -63,10 +65,11 @@ namespace DotNetCore.CAP.Kafka
{
_kafkaOptions.MainConfig.Add("group.id", _groupId);

var config = _kafkaOptions.AsRdkafkaConfig();
var config = _kafkaOptions.AskafkaConfig();
_consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);

_consumerClient.OnMessage += ConsumerClient_OnMessage;
_consumerClient.OnError += ConsumerClient_OnError;
}

private void ConsumerClient_OnMessage(object sender, Message<Null, string> e)
@@ -77,7 +80,13 @@ namespace DotNetCore.CAP.Kafka
Name = e.Topic,
Content = e.Value
};
MessageReceieved?.Invoke(sender, message);

OnMessageReceieved?.Invoke(sender, message);
}

private void ConsumerClient_OnError(object sender, Error e)
{
OnError?.Invoke(sender, e.Reason);
}

#endregion private methods


+ 5
- 4
src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs Datei anzeigen

@@ -1,14 +1,15 @@
using Microsoft.Extensions.Options;
using System;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
public class KafkaConsumerClientFactory : IConsumerClientFactory
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
private readonly KafkaOptions _kafkaOptions;

public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions)
public KafkaConsumerClientFactory(KafkaOptions kafkaOptions)
{
_kafkaOptions = kafkaOptions.Value;
_kafkaOptions = kafkaOptions;
}

public IConsumerClient Create(string groupId)


+ 22
- 18
src/DotNetCore.CAP.Kafka/PublishQueueExecutor.cs Datei anzeigen

@@ -2,52 +2,56 @@
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Kafka
{
public class PublishQueueExecutor : BasePublishQueueExecutor
internal class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly KafkaOptions _kafkaOptions;

public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<KafkaOptions> options,
KafkaOptions options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_kafkaOptions = options.Value;
_kafkaOptions = options;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
{
try
{
var config = _kafkaOptions.AsRdkafkaConfig();
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
var config = _kafkaOptions.AskafkaConfig();
var contentBytes = Encoding.UTF8.GetBytes(content);
using (var producer = new Producer(config))
{
producer.ProduceAsync(keyName, null, content);
producer.Flush();
}
var message = producer.ProduceAsync(keyName, null, contentBytes).Result;

_logger.LogDebug($"kafka topic message [{keyName}] has been published.");
if (!message.Error.HasError)
{
_logger.LogDebug($"kafka topic message [{keyName}] has been published.");

return Task.FromResult(OperateResult.Success);
return Task.FromResult(OperateResult.Success);
}
else
{
return Task.FromResult(OperateResult.Failed(new OperateError
{
Code = message.Error.Code.ToString(),
Description = message.Error.Reason
}));
}
}
}
catch (Exception ex)
{
_logger.LogError($"kafka topic message [{keyName}] has benn raised an exception of sending. the exception is: {ex.Message}");

return Task.FromResult(OperateResult.Failed(ex,
new OperateError()
{
Code = ex.HResult.ToString(),
Description = ex.Message
}));
return Task.FromResult(OperateResult.Failed(ex));
}
}
}

+ 13
- 0
src/DotNetCore.CAP.MySql/CAP.EFOptions.cs Datei anzeigen

@@ -0,0 +1,13 @@
using System;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
/// <summary>
/// EF dbcontext type.
/// </summary>
internal Type DbContextType { get; set; }
}
}

+ 44
- 0
src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs Datei anzeigen

@@ -0,0 +1,44 @@
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal class MySqlCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<MySqlOptions> _configure;

public MySqlCapOptionsExtension(Action<MySqlOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, MySqlStorage>();
services.AddScoped<IStorageConnection, MySqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();

var mysqlOptions = new MySqlOptions();
_configure(mysqlOptions);

if (mysqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(mysqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
services.AddSingleton(mysqlOptions);
}

private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
}
}

+ 13
- 0
src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs Datei anzeigen

@@ -0,0 +1,13 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }

public string TableNamePrefix { get; set; } = "cap";
}
}

+ 49
- 0
src/DotNetCore.CAP.MySql/CAP.Options.Extensions.cs Datei anzeigen

@@ -0,0 +1,49 @@
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseMySql(this CapOptions options, string connectionString)
{
return options.UseMySql(opt =>
{
opt.ConnectionString = connectionString;
});
}

public static CapOptions UseMySql(this CapOptions options, Action<MySqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new MySqlCapOptionsExtension(configure));

return options;
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

var efOptions = new EFOptions { DbContextType = typeof(TContext) };
configure(efOptions);

options.RegisterExtension(new MySqlCapOptionsExtension(configure));

return options;
}
}
}

+ 203
- 0
src/DotNetCore.CAP.MySql/CapPublisher.cs Datei anzeigen

@@ -0,0 +1,203 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.MySql
{
public class CapPublisher : ICapPublisher
{
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly DbContext _dbContext;

protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; }

protected IServiceProvider ServiceProvider { get; }

public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
MySqlOptions options)
{
ServiceProvider = provider;
_logger = logger;
_options = options;

if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}

public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

PublishCore(name, content);
}

public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

return PublishCoreAsync(name, content);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

#region private methods

private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj?.ToString();
}
return content;
}

private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}

private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
}

private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}

PublishQueuer.PulseEvent.Set();
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}

private string PrepareSql()
{
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}

#endregion private methods
}
}

+ 27
- 0
src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj Datei anzeigen

@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\build\common.props" />

<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<AssemblyName>DotNetCore.CAP.MySql</AssemblyName>
<PackageId>DotNetCore.CAP.MySql</PackageId>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50</PackageTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="MySqlConnector" Version="0.23.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 11
- 0
src/DotNetCore.CAP.MySql/FetchedMessage.cs Datei anzeigen

@@ -0,0 +1,11 @@
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.MySql
{
internal class FetchedMessage
{
public int MessageId { get; set; }

public MessageType MessageType { get; set; }
}
}

+ 61
- 0
src/DotNetCore.CAP.MySql/IAdditionalProcessor.Default.cs Datei anzeigen

@@ -0,0 +1,61 @@
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;

namespace DotNetCore.CAP.MySql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly MySqlOptions _options;

private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);

public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
MySqlOptions mysqlOptions)
{
_logger = logger;
_provider = provider;
_options = mysqlOptions;
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");

var tables = new string[]{
$"{_options.TableNamePrefix}.published",
$"{_options.TableNamePrefix}.received"
};

foreach (var table in tables)
{
var removedCount = 0;
do
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
new { now = DateTime.Now, count = MaxBatch });
}

if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}

await context.WaitAsync(_waitingInterval);
}
}
}

+ 74
- 0
src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs Datei anzeigen

@@ -0,0 +1,74 @@
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();

public MySqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}

public int MessageId { get; }

public MessageType MessageType { get; }

public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}

public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}

public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}

private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}

+ 66
- 0
src/DotNetCore.CAP.MySql/MySqlStorage.cs Datei anzeigen

@@ -0,0 +1,66 @@
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.Extensions.Logging;
using MySql.Data.MySqlClient;

namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
private readonly MySqlOptions _options;
private readonly ILogger _logger;

public MySqlStorage(ILogger<MySqlStorage> logger, MySqlOptions options)
{
_options = options;
_logger = logger;
}

public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(_options.TableNamePrefix);
using (var connection = new MySqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}

_logger.LogDebug("Ensuring all create database tables script are applied.");
}

protected virtual string CreateDbTablesScript(string prefix)
{
var batchSql =
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
`MessageType` tinyint(4) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Id` int(127) NOT NULL AUTO_INCREMENT,
`Name` varchar(400) NOT NULL,
`Group` varchar(200) DEFAULT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
`Added` datetime(6) NOT NULL,
`ExpiresAt` datetime(6) DEFAULT NULL,
`StatusName` varchar(50) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE IF NOT EXISTS `{prefix}.published` (
`Id` int(127) NOT NULL AUTO_INCREMENT,
`Name` varchar(200) NOT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
`Added` datetime(6) NOT NULL,
`ExpiresAt` datetime(6) DEFAULT NULL,
`StatusName` varchar(40) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
return batchSql;
}
}
}

+ 152
- 0
src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs Datei anzeigen

@@ -0,0 +1,152 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;

namespace DotNetCore.CAP.MySql
{
public class MySqlStorageConnection : IStorageConnection
{
private readonly MySqlOptions _options;
private readonly string _prefix;

public MySqlStorageConnection(MySqlOptions options)
{
_options = options;
_prefix = _options.TableNamePrefix;
}

public MySqlOptions Options => _options;

public IStorageTransaction CreateTransaction()
{
return new MySqlStorageTransaction(this);
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};";

using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public Task<IFetchedMessage> FetchNextMessageAsync()
{
//Last execute statement(FOR UPDATE to fix dirty read) :

//SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
//START TRANSACTION;
//SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
//DELETE FROM `{_prefix}.queue` LIMIT 1;
//COMMIT;

var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;";

return FetchNextMessageCoreAsync(sql);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";

using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Failed}';";

using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}

// CapReceviedMessage

public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $@"
INSERT INTO `{_prefix}.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using (var connection = new MySqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};";
using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";
using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Failed}';";
using (var connection = new MySqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}

public void Dispose()
{
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new MySqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (MySqlException)
{
transaction.Dispose();
throw;
}

if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}

return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
}
}

+ 71
- 0
src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs Datei anzeigen

@@ -0,0 +1,71 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using MySql.Data.MySqlClient;

namespace DotNetCore.CAP.MySql
{
public class MySqlStorageTransaction : IStorageTransaction, IDisposable
{
private readonly string _prefix;

private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;

public MySqlStorageTransaction(MySqlStorageConnection connection)
{
var options = connection.Options;
_prefix = options.TableNamePrefix;

_dbConnection = new MySqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}

public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"UPDATE `{_prefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"UPDATE `{_prefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
}

public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
}

public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}

public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}

+ 13
- 6
src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs Datei anzeigen

@@ -1,4 +1,6 @@
// ReSharper disable once CheckNamespace
using System;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
@@ -27,16 +29,16 @@ namespace DotNetCore.CAP
public const string DefaultVHost = "/";

/// <summary>
/// Default exchange name (value: "cap").
/// Default exchange name (value: "cap.default.topic").
/// </summary>
public const string DefaultExchangeName = "cap";
public const string DefaultExchangeName = "cap.default.topic";

/// <summary> The topic exchange type. </summary>
public const string ExchangeType = "topic";

/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";

/// <summary> The topic exchange type. </summary>
internal const string ExchangeType = "topic";

/// <summary>
/// Password to use when authenticating to the server.
/// </summary>
@@ -76,5 +78,10 @@ namespace DotNetCore.CAP
/// The port to connect on.
/// </summary>
public int Port { get; set; } = -1;

/// <summary>
/// Gets or sets queue message automatic deletion time (in milliseconds). Default 864000000 ms (10 days).
/// </summary>
public int QueueMessageExpires { get; set; } = 864000000;
}
}

+ 4
- 7
src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs Datei anzeigen

@@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQCapOptionsExtension : ICapOptionsExtension
internal sealed class RabbitMQCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<RabbitMQOptions> _configure;

@@ -16,12 +16,9 @@ namespace DotNetCore.CAP

public void AddServices(IServiceCollection services)
{
services.Configure(_configure);

var rabbitMQOptions = new RabbitMQOptions();
_configure(rabbitMQOptions);

services.AddSingleton(rabbitMQOptions);
var options = new RabbitMQOptions();
_configure?.Invoke(options);
services.AddSingleton(options);

services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>();
services.AddTransient<IQueueExecutor, PublishQueueExecutor>();


+ 3
- 0
src/DotNetCore.CAP.RabbitMQ/CAP.SubscribeAttribute.cs Datei anzeigen

@@ -3,6 +3,9 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// An attribute for subscribe RabbitMQ messages.
/// </summary>
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string name) : base(name)


+ 1
- 1
src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj Datei anzeigen

@@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="4.1.3" />
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" />
</ItemGroup>

<ItemGroup>


+ 4
- 4
src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs Datei anzeigen

@@ -8,18 +8,18 @@ using RabbitMQ.Client;

namespace DotNetCore.CAP.RabbitMQ
{
public class PublishQueueExecutor : BasePublishQueueExecutor
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
private readonly RabbitMQOptions _rabbitMQOptions;

public PublishQueueExecutor(IStateChanger stateChanger,
IOptions<RabbitMQOptions> options,
RabbitMQOptions options,
ILogger<PublishQueueExecutor> logger)
: base(stateChanger, logger)
{
_logger = logger;
_rabbitMQOptions = options.Value;
_rabbitMQOptions = options;
}

public override Task<OperateResult> PublishAsync(string keyName, string content)
@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
var body = Encoding.UTF8.GetBytes(content);

channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType);
channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, RabbitMQOptions.ExchangeType, durable: true);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: keyName,
basicProperties: null,


+ 35
- 16
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs Datei anzeigen

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -7,7 +8,7 @@ using RabbitMQ.Client.Events;

namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQConsumerClient : IConsumerClient
internal sealed class RabbitMQConsumerClient : IConsumerClient
{
private readonly string _exchageName;
private readonly string _queueName;
@@ -18,7 +19,9 @@ namespace DotNetCore.CAP.RabbitMQ
private IModel _channel;
private ulong _deliveryTag;

public event EventHandler<MessageContext> MessageReceieved;
public event EventHandler<MessageContext> OnMessageReceieved;

public event EventHandler<string> OnError;

public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
{
@@ -45,31 +48,42 @@ namespace DotNetCore.CAP.RabbitMQ

_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchageName, type: RabbitMQOptions.ExchangeType);
_channel.QueueDeclare(_queueName, exclusive: false);

_channel.ExchangeDeclare(
exchange: _exchageName,
type: RabbitMQOptions.ExchangeType,
durable: true);

var arguments = new Dictionary<string, object> { { "x-message-ttl", (int)_rabbitMQOptions.QueueMessageExpires } };
_channel.QueueDeclare(_queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);
}

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));

foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
_channel.BasicConsume(_queueName, false, consumer);
while (true)
{
Task.Delay(timeout, cancellationToken).Wait();
Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult();
}
}

public void Subscribe(string topic)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}

public void Subscribe(string topic, int partition)
{
_channel.QueueBind(_queueName, _exchageName, topic);
}

public void Commit()
{
_channel.BasicAck(_deliveryTag, false);
@@ -90,7 +104,12 @@ namespace DotNetCore.CAP.RabbitMQ
Name = e.RoutingKey,
Content = Encoding.UTF8.GetString(e.Body)
};
MessageReceieved?.Invoke(sender, message);
OnMessageReceieved?.Invoke(sender, message);
}

private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
OnError?.Invoke(sender, e.Cause?.ToString());
}
}
}

+ 3
- 3
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs Datei anzeigen

@@ -2,13 +2,13 @@

namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQConsumerClientFactory : IConsumerClientFactory
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;

public RabbitMQConsumerClientFactory(IOptions<RabbitMQOptions> rabbitMQOptions)
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions)
{
_rabbitMQOptions = rabbitMQOptions.Value;
_rabbitMQOptions = rabbitMQOptions;
}

public IConsumerClient Create(string groupId)


+ 1
- 1
src/DotNetCore.CAP.SqlServer/CAP.EFOptions.cs Datei anzeigen

@@ -16,6 +16,6 @@ namespace DotNetCore.CAP
/// <summary>
/// EF dbcontext type.
/// </summary>
public Type DbContextType { get; internal set; }
internal Type DbContextType { get; set; }
}
}

+ 4
- 5
src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs Datei anzeigen

@@ -7,7 +7,7 @@ using Microsoft.Extensions.DependencyInjection;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerCapOptionsExtension : ICapOptionsExtension
internal class SqlServerCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<SqlServerOptions> _configure;

@@ -26,14 +26,13 @@ namespace DotNetCore.CAP
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);

var provider = TempBuildService(services);
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType);
if (dbContextObj != null)
if (sqlServerOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(sqlServerOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
services.Configure(_configure);
services.AddSingleton(sqlServerOptions);
}



+ 39
- 52
src/DotNetCore.CAP.SqlServer/CapPublisher.cs Datei anzeigen

@@ -38,25 +38,11 @@ namespace DotNetCore.CAP.SqlServer
}
}

public void Publish(string name, string content)
{
CheckIsUsingEF(name);

PublishCore(name, content);
}

public Task PublishAsync(string name, string content)
{
CheckIsUsingEF(name);

return PublishCoreAsync(name, content);
}

public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Helper.ToJson(contentObj);
var content = Serialize(contentObj);

PublishCore(name, content);
}
@@ -65,67 +51,62 @@ namespace DotNetCore.CAP.SqlServer
{
CheckIsUsingEF(name);

var content = Helper.ToJson(contentObj);
var content = Serialize(contentObj);

return PublishCoreAsync(name, content);
}

public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;
var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
IsCapOpenedTrans = true;
var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

var content = Helper.ToJson(contentObj);

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
#region private methods

PublishWithTrans(name, content, dbConnection, dbTransaction);
private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
CheckIsAdoNet(name);

if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

var content = Helper.ToJson(contentObj);

dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

#region private methods

private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
@@ -145,8 +126,11 @@ namespace DotNetCore.CAP.SqlServer
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}
@@ -155,8 +139,11 @@ namespace DotNetCore.CAP.SqlServer
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
IsCapOpenedTrans = transaction == null;
transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
}


+ 1
- 1
src/DotNetCore.CAP.SqlServer/FetchedMessage.cs Datei anzeigen

@@ -2,7 +2,7 @@

namespace DotNetCore.CAP.SqlServer
{
public class FetchedMessage
internal class FetchedMessage
{
public int MessageId { get; set; }



+ 2
- 3
src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs Datei anzeigen

@@ -6,7 +6,6 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore;

namespace DotNetCore.CAP.SqlServer
{
@@ -102,7 +101,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
var sql = $"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
@@ -115,7 +114,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
//here don't use `using` to dispose
var connection = new SqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);


+ 0
- 1
src/DotNetCore.CAP/Abstractions/ConsumerContext.cs Datei anzeigen

@@ -1,5 +1,4 @@
using System;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Abstractions
{


+ 1
- 10
src/DotNetCore.CAP/Abstractions/ModelBinding/IModelBinder.cs Datei anzeigen

@@ -7,15 +7,6 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding
/// </summary>
public interface IModelBinder
{
/// <summary>
/// Attempts to bind a model.
/// </summary>
/// <param name="bindingContext">The <see cref="ModelBindingContext"/>.</param>
/// <returns>
/// <para>
/// A <see cref="Task"/> which will complete when the model binding process completes.
/// </para>
/// </returns>
Task BindModelAsync(ModelBindingContext bindingContext);
Task<ModelBindingResult> BindModelAsync(string content);
}
}

+ 0
- 55
src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingContext.cs Datei anzeigen

@@ -1,55 +0,0 @@
using System;
using Microsoft.Extensions.Primitives;

namespace DotNetCore.CAP.Abstractions.ModelBinding
{
/// <summary>
/// A context that contains operating information for model binding and validation.
/// </summary>
public class ModelBindingContext
{
/// <summary>
/// Gets or sets the model value for the current operation.
/// </summary>
/// <remarks>
/// The <see cref="Model"/> will typically be set for a binding operation that works
/// against a pre-existing model object to update certain properties.
/// </remarks>
public object Model { get; set; }

/// <summary>
/// Gets or sets the name of the model.
/// </summary>
public string ModelName { get; set; }

/// <summary>
/// Gets or sets the type of the model.
/// </summary>
public Type ModelType { get; set; }

/// <summary>
/// Gets or sets the values of the model.
/// </summary>
public StringValues Values { get; set; }

/// <summary>
/// <para>
/// Gets or sets a result which represents the result of the model binding process.
/// </para>
/// </summary>
public object Result { get; set; }

/// <summary>
/// Creates a new <see cref="ModelBindingContext"/> for top-level model binding operation.
/// </summary>
public static ModelBindingContext CreateBindingContext(string values, string modelName, Type modelType)
{
return new ModelBindingContext()
{
ModelName = modelName,
ModelType = modelType,
Values = values
};
}
}
}

+ 105
- 0
src/DotNetCore.CAP/Abstractions/ModelBinding/ModelBindingResult.cs Datei anzeigen

@@ -0,0 +1,105 @@
using DotNetCore.CAP.Internal;

namespace DotNetCore.CAP.Abstractions.ModelBinding
{
/// <summary>
/// Contains the result of model binding.
/// </summary>
public struct ModelBindingResult
{
/// <summary>
/// Creates a <see cref="ModelBindingResult"/> representing a failed model binding operation.
/// </summary>
/// <returns>A <see cref="ModelBindingResult"/> representing a failed model binding operation.</returns>
public static ModelBindingResult Failed()
{
return new ModelBindingResult(model: null, isSuccess: false);
}

/// <summary>
/// Creates a <see cref="ModelBindingResult"/> representing a successful model binding operation.
/// </summary>
/// <param name="model">The model value. May be <c>null.</c></param>
/// <returns>A <see cref="ModelBindingResult"/> representing a successful model bind.</returns>
public static ModelBindingResult Success(object model)
{
return new ModelBindingResult(model, isSuccess: true);
}

private ModelBindingResult(object model, bool isSuccess)
{
Model = model;
IsSuccess = isSuccess;
}

/// <summary>
/// Gets the model associated with this context.
/// </summary>
public object Model { get; }

public bool IsSuccess { get; }

public override string ToString()
{
if (IsSuccess)
{
return $"Success '{Model}'";
}
else
{
return $"Failed";
}
}

public override bool Equals(object obj)
{
var other = obj as ModelBindingResult?;
if (other == null)
{
return false;
}
else
{
return Equals(other.Value);
}
}

public override int GetHashCode()
{
var hashCodeCombiner = HashCodeCombiner.Start();
hashCodeCombiner.Add(IsSuccess);
hashCodeCombiner.Add(Model);

return hashCodeCombiner.CombinedHash;
}

public bool Equals(ModelBindingResult other)
{
return
IsSuccess == other.IsSuccess &&
object.Equals(Model, other.Model);
}

/// <summary>
/// Compares <see cref="ModelBindingResult"/> objects for equality.
/// </summary>
/// <param name="x">A <see cref="ModelBindingResult"/>.</param>
/// <param name="y">A <see cref="ModelBindingResult"/>.</param>
/// <returns><c>true</c> if the objects are equal, otherwise <c>false</c>.</returns>
public static bool operator ==(ModelBindingResult x, ModelBindingResult y)
{
return x.Equals(y);
}

/// <summary>
/// Compares <see cref="ModelBindingResult"/> objects for inequality.
/// </summary>
/// <param name="x">A <see cref="ModelBindingResult"/>.</param>
/// <param name="y">A <see cref="ModelBindingResult"/>.</param>
/// <returns><c>true</c> if the objects are not equal, otherwise <c>false</c>.</returns>
public static bool operator !=(ModelBindingResult x, ModelBindingResult y)
{
return !x.Equals(y);
}
}
}

+ 1
- 1
src/DotNetCore.CAP/CAP.AppBuilderExtensions.cs Datei anzeigen

@@ -25,7 +25,7 @@ namespace Microsoft.AspNetCore.Builder

if (marker == null)
{
throw new InvalidOperationException("Add Consistency must be called on the service collection.");
throw new InvalidOperationException("Add Cap must be called on the service collection.");
}

var provider = app.ApplicationServices;


+ 22
- 18
src/DotNetCore.CAP/CAP.Options.cs Datei anzeigen

@@ -13,34 +13,45 @@ namespace DotNetCore.CAP
/// <summary>
/// Default value for polling delay timeout, in seconds.
/// </summary>
public const int DefaultPollingDelay = 8;
public const int DefaultPollingDelay = 15;

/// <summary>
/// Default processor count to process messages of cap.queue.
/// </summary>
public const int DefaultQueueProcessorCount = 2;

public CapOptions()
{
PollingDelay = DefaultPollingDelay;
QueueProcessorCount = DefaultQueueProcessorCount;
Extensions = new List<ICapOptionsExtension>();
}

/// <summary>
/// Productor job polling delay time. Default is 5 sec.
/// Productor job polling delay time. Default is 15 sec.
/// </summary>
public int PollingDelay { get; set; }

/// <summary>
/// Gets or sets the messages queue (Cap.Queue table) processor count.
/// </summary>
public int PollingDelay { get; set; } = 5;
public int QueueProcessorCount { get; set; }

/// <summary>
/// Failed messages polling delay time. Default is 2 min.
/// Failed messages polling delay time. Default is 3 min.
/// </summary>
public TimeSpan FailedMessageWaitingInterval = TimeSpan.FromMinutes(2);
public int FailedMessageWaitingInterval { get; set; } = (int)TimeSpan.FromMinutes(3).TotalSeconds;

/// <summary>
/// We’ll send a POST request to the URL below with details of any subscribed events.
/// We’ll invoke this call-back with message type,name,content when requeue failed message.
/// </summary>
public WebHook WebHook => throw new NotSupportedException();
public Action<Models.MessageType, string, string> FailedCallback { get; set; }

/// <summary>
/// Registers an extension that will be executed when building services.
/// </summary>
/// <param name="extension"></param>
public void RegisterExtension(ICapOptionsExtension extension)
/// Registers an extension that will be executed when building services.
/// </summary>
/// <param name="extension"></param>
public void RegisterExtension(ICapOptionsExtension extension)
{
if (extension == null)
throw new ArgumentNullException(nameof(extension));
@@ -48,11 +59,4 @@ namespace DotNetCore.CAP
Extensions.Add(extension);
}
}

public class WebHook
{
public string PayloadUrl { get; set; }

public string Secret { get; set; }
}
}

+ 2
- 3
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs Datei anzeigen

@@ -3,7 +3,6 @@ using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor;
@@ -35,7 +34,7 @@ namespace Microsoft.Extensions.DependencyInjection
AddSubscribeServices(services);

services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IModelBinder, DefaultModelBinder>();
services.TryAddSingleton<IModelBinderFactory, ModelBinderFactory>();
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>();

@@ -60,7 +59,7 @@ namespace Microsoft.Extensions.DependencyInjection
foreach (var serviceExtension in options.Extensions)
{
serviceExtension.AddServices(services);
}
}
services.AddSingleton(options);

return new CapBuilder(services);


+ 1
- 1
src/DotNetCore.CAP/IBootstrapper.Default.cs Datei anzeigen

@@ -40,7 +40,7 @@ namespace DotNetCore.CAP
_cts.Cancel();
try
{
_bootstrappingTask?.Wait();
_bootstrappingTask?.GetAwaiter().GetResult();
}
catch (OperationCanceledException ex)
{


+ 0
- 40
src/DotNetCore.CAP/ICapPublisher.cs Datei anzeigen

@@ -8,28 +8,6 @@ namespace DotNetCore.CAP
/// </summary>
public interface ICapPublisher
{
/// <summary>
/// (EntityFramework) Asynchronous publish a message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
Task PublishAsync(string name, string content);

/// <summary>
/// (EntityFramework) Publish a message.
/// <para>
/// If you are using the EntityFramework, you need to configure the DbContextType first.
/// otherwise you need to use overloaded method with IDbConnection and IDbTransaction.
/// </para>
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
void Publish(string name, string content);

/// <summary>
/// (EntityFramework) Asynchronous publish a object message.
/// <para>
@@ -54,24 +32,6 @@ namespace DotNetCore.CAP
/// <param name="contentObj">message body content, that will be serialized of json.</param>
void Publish<T>(string name, T contentObj);

/// <summary>
/// (ado.net) Asynchronous publish a message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

/// <summary>
/// (ado.net) Publish a message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="content">message body content.</param>
/// <param name="dbConnection">the connection of <see cref="IDbConnection"/></param>
/// <param name="dbTransaction">the transaction of <see cref="IDbTransaction"/></param>
void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null);

/// <summary>
/// (ado.net) Asynchronous publish a object message.
/// </summary>


+ 5
- 5
src/DotNetCore.CAP/IConsumerClient.cs Datei anzeigen

@@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP
{
@@ -9,14 +9,14 @@ namespace DotNetCore.CAP
/// </summary>
public interface IConsumerClient : IDisposable
{
void Subscribe(string topic);

void Subscribe(string topic, int partition);
void Subscribe(IEnumerable<string> topics);

void Listening(TimeSpan timeout, CancellationToken cancellationToken);

void Commit();

event EventHandler<MessageContext> MessageReceieved;
event EventHandler<MessageContext> OnMessageReceieved;

event EventHandler<string> OnError;
}
}

+ 9
- 6
src/DotNetCore.CAP/IConsumerHandler.Default.cs Datei anzeigen

@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
@@ -56,10 +57,7 @@ namespace DotNetCore.CAP
{
RegisterMessageProcessor(client);

foreach (var item in matchGroup.Value)
{
client.Subscribe(item.Attribute.Name);
}
client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));

client.Listening(_pollingDelay, _cts.Token);
}
@@ -95,7 +93,7 @@ namespace DotNetCore.CAP

private void RegisterMessageProcessor(IConsumerClient client)
{
client.MessageReceieved += (sender, message) =>
client.OnMessageReceieved += (sender, message) =>
{
_logger.EnqueuingReceivedMessage(message.Name, message.Content);

@@ -106,6 +104,11 @@ namespace DotNetCore.CAP
}
Pulse();
};

client.OnError += (sender, reason) =>
{
_logger.LogError(reason);
};
}

private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
@@ -116,7 +119,7 @@ namespace DotNetCore.CAP
{
StatusName = StatusName.Scheduled,
};
messageStore.StoreReceivedMessageAsync(receivedMessage).Wait();
messageStore.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult();
return receivedMessage;
}



+ 0
- 1
src/DotNetCore.CAP/IQueueExecutor.Subscibe.cs Datei anzeigen

@@ -2,7 +2,6 @@
using System.Diagnostics;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;


+ 1
- 0
src/DotNetCore.CAP/IStorageConnection.cs Datei anzeigen

@@ -56,6 +56,7 @@ namespace DotNetCore.CAP
/// Returns executed failed message.
/// </summary>
Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages();

//-----------------------------------------

/// <summary>


+ 6
- 0
src/DotNetCore.CAP/Infrastructure/Helper.cs Datei anzeigen

@@ -1,4 +1,5 @@
using System;
using System.ComponentModel;
using System.Reflection;
using Newtonsoft.Json;

@@ -68,5 +69,10 @@ namespace DotNetCore.CAP.Infrastructure
return !typeInfo.ContainsGenericParameters
&& typeInfo.Name.EndsWith("Controller", StringComparison.OrdinalIgnoreCase);
}

public static bool IsComplexType(Type type)
{
return !TypeDescriptor.GetConverter(type).CanConvertFrom(typeof(string));
}
}
}

+ 1
- 3
src/DotNetCore.CAP/Infrastructure/WebHookProvider.cs Datei anzeigen

@@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace DotNetCore.CAP.Infrastructure
{
@@ -11,4 +9,4 @@ namespace DotNetCore.CAP.Infrastructure
throw new NotImplementedException();
}
}
}
}

+ 4
- 6
src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs Datei anzeigen

@@ -1,7 +1,5 @@
using System;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Internal
@@ -10,15 +8,15 @@ namespace DotNetCore.CAP.Internal
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IModelBinder _modelBinder;
private readonly IModelBinderFactory _modelBinderFactory;

public ConsumerInvokerFactory(
ILoggerFactory loggerFactory,
IModelBinder modelBinder,
IModelBinderFactory modelBinderFactory,
IServiceProvider serviceProvider)
{
_logger = loggerFactory.CreateLogger<ConsumerInvokerFactory>();
_modelBinder = modelBinder;
_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
}

@@ -26,7 +24,7 @@ namespace DotNetCore.CAP.Internal
{
var context = new ConsumerInvokerContext(consumerContext)
{
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinder, consumerContext)
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, _modelBinderFactory, consumerContext)
};

return context.Result;


+ 81
- 0
src/DotNetCore.CAP/Internal/HashCodeCombiner.cs Datei anzeigen

@@ -0,0 +1,81 @@
using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

namespace DotNetCore.CAP.Internal
{
internal struct HashCodeCombiner
{
private long _combinedHash64;

public int CombinedHash
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get { return _combinedHash64.GetHashCode(); }
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private HashCodeCombiner(long seed)
{
_combinedHash64 = seed;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(IEnumerable e)
{
if (e == null)
{
Add(0);
}
else
{
var count = 0;
foreach (object o in e)
{
Add(o);
count++;
}
Add(count);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static implicit operator int(HashCodeCombiner self)
{
return self.CombinedHash;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(int i)
{
_combinedHash64 = ((_combinedHash64 << 5) + _combinedHash64) ^ i;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(string s)
{
var hashCode = (s != null) ? s.GetHashCode() : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(object o)
{
var hashCode = (o != null) ? o.GetHashCode() : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add<TValue>(TValue value, IEqualityComparer<TValue> comparer)
{
var hashCode = value != null ? comparer.GetHashCode(value) : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static HashCodeCombiner Start()
{
return new HashCodeCombiner(0x1505L);
}
}
}

+ 22
- 14
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs Datei anzeigen

@@ -1,7 +1,6 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Abstractions.ModelBinding;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

@@ -11,16 +10,16 @@ namespace DotNetCore.CAP.Internal
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IModelBinder _modelBinder;
private readonly IModelBinderFactory _modelBinderFactory;
private readonly ConsumerContext _consumerContext;
private readonly ObjectMethodExecutor _executor;

public DefaultConsumerInvoker(ILogger logger,
IServiceProvider serviceProvider,
IModelBinder modelBinder,
IModelBinderFactory modelBinderFactory,
ConsumerContext consumerContext)
{
_modelBinder = modelBinder;
_modelBinderFactory = modelBinderFactory;
_serviceProvider = serviceProvider;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

@@ -29,32 +28,41 @@ namespace DotNetCore.CAP.Internal
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
}

public Task InvokeAsync()
public async Task InvokeAsync()
{
using (_logger.BeginScope("consumer invoker begin"))
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);

var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());

var value = _consumerContext.DeliverMessage.Content;

if (_executor.MethodParameters.Length > 0)
{
var firstParameter = _executor.MethodParameters[0];

var bindingContext = ModelBindingContext.CreateBindingContext(value,
firstParameter.Name, firstParameter.ParameterType);

_modelBinder.BindModelAsync(bindingContext);
_executor.Execute(obj, bindingContext.Result);
try
{
var binder = _modelBinderFactory.CreateBinder(firstParameter);
var result = await binder.BindModelAsync(value);
if (result.IsSuccess)
{
_executor.Execute(obj, result.Model);
}
else
{
_logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value);
}
}
catch (FormatException ex)
{
_logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex);
}
}
else
{
_executor.Execute(obj);
}
return Task.CompletedTask;
}
}
}

+ 17
- 14
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs Datei anzeigen

@@ -58,13 +58,7 @@ namespace DotNetCore.CAP.Internal
continue;
}

foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;

executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo));
}
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}
return executorDescriptorList;
}
@@ -82,18 +76,27 @@ namespace DotNetCore.CAP.Internal
//double check
if (!Helper.IsController(typeInfo)) continue;

foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttribute<TopicAttribute>(true);
if (topicAttr == null) continue;

executorDescriptorList.Add(InitDescriptor(topicAttr, method, typeInfo));
}
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
}

return executorDescriptorList;
}

private static IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo)
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttrs = method.GetCustomAttributes<TopicAttribute>(true);

if (topicAttrs.Count() == 0) continue;

foreach (var attr in topicAttrs)
{
yield return InitDescriptor(attr, method, typeInfo);
}
}
}

private static ConsumerExecutorDescriptor InitDescriptor(
TopicAttribute attr,
MethodInfo methodInfo,


+ 32
- 0
src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs Datei anzeigen

@@ -0,0 +1,32 @@
using System;
using System.Reflection;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Internal
{
public class ComplexTypeModelBinder : IModelBinder
{
private readonly ParameterInfo _parameterInfo;

public ComplexTypeModelBinder(ParameterInfo parameterInfo)
{
_parameterInfo = parameterInfo;
}

public Task<ModelBindingResult> BindModelAsync(string content)
{
try
{
var type = _parameterInfo.ParameterType;
var value = Helper.FromJson(content, type);
return Task.FromResult(ModelBindingResult.Success(value));
}
catch (Exception)
{
return Task.FromResult(ModelBindingResult.Failed());
}
}
}
}

+ 0
- 47
src/DotNetCore.CAP/Internal/IModelBinder.Default.cs Datei anzeigen

@@ -1,47 +0,0 @@
using System;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Internal
{
public class DefaultModelBinder : IModelBinder
{
private Func<object> _modelCreator;

public Task BindModelAsync(ModelBindingContext bindingContext)
{
if (bindingContext.Model == null)
{
bindingContext.Model = CreateModel(bindingContext);
}

bindingContext.Result = Helper.FromJson(bindingContext.Values, bindingContext.ModelType);

return Task.CompletedTask;
}

protected virtual object CreateModel(ModelBindingContext bindingContext)
{
if (bindingContext == null)
{
throw new ArgumentNullException(nameof(bindingContext));
}

if (_modelCreator != null) return _modelCreator();
var modelTypeInfo = bindingContext.ModelType.GetTypeInfo();
if (modelTypeInfo.IsAbstract || modelTypeInfo.GetConstructor(Type.EmptyTypes) == null)
{
throw new InvalidOperationException();
}

_modelCreator = Expression
.Lambda<Func<object>>(Expression.New(bindingContext.ModelType))
.Compile();

return _modelCreator();
}
}
}

+ 86
- 0
src/DotNetCore.CAP/Internal/IModelBinder.SimpleType.cs Datei anzeigen

@@ -0,0 +1,86 @@
using System;
using System.ComponentModel;
using System.Globalization;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions.ModelBinding;

namespace DotNetCore.CAP.Internal
{
public class SimpleTypeModelBinder : IModelBinder
{
private readonly ParameterInfo _parameterInfo;
private readonly TypeConverter _typeConverter;

public SimpleTypeModelBinder(ParameterInfo parameterInfo)
{
_parameterInfo = parameterInfo ?? throw new ArgumentNullException(nameof(parameterInfo));
_typeConverter = TypeDescriptor.GetConverter(parameterInfo.ParameterType);
}

public Task<ModelBindingResult> BindModelAsync(string content)
{
if (content == null)
{
throw new ArgumentNullException(nameof(content));
}

var parameterType = _parameterInfo.ParameterType;

try
{
object model;
if (parameterType == typeof(string))
{
if (string.IsNullOrWhiteSpace(content))
{
model = null;
}
else
{
model = content;
}
}
else if (string.IsNullOrWhiteSpace(content))
{
// Other than the StringConverter, converters Trim() the value then throw if the result is empty.
model = null;
}
else
{
model = _typeConverter.ConvertFrom(
context: null,
culture: CultureInfo.CurrentCulture,
value: content);
}

if (model == null && !IsReferenceOrNullableType(parameterType))
{
return Task.FromResult(ModelBindingResult.Failed());
}
else
{
return Task.FromResult(ModelBindingResult.Success(model));
}
}
catch (Exception exception)
{
var isFormatException = exception is FormatException;
if (!isFormatException && exception.InnerException != null)
{
// TypeConverter throws System.Exception wrapping the FormatException,
// so we capture the inner exception.
exception = ExceptionDispatchInfo.Capture(exception.InnerException).SourceException;
}
throw exception;
}
}

private bool IsReferenceOrNullableType(Type type)
{
var isNullableValueType = Nullable.GetUnderlyingType(type) != null;
return !type.GetTypeInfo().IsValueType || isNullableValueType;
}
}
}

+ 10
- 0
src/DotNetCore.CAP/Internal/IModelBinderFactory.cs Datei anzeigen

@@ -0,0 +1,10 @@
using System.Reflection;
using DotNetCore.CAP.Abstractions.ModelBinding;

namespace DotNetCore.CAP.Internal
{
public interface IModelBinderFactory
{
IModelBinder CreateBinder(ParameterInfo parameter);
}
}

+ 114
- 0
src/DotNetCore.CAP/Internal/ModelBinderFactory.cs Datei anzeigen

@@ -0,0 +1,114 @@
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Runtime.CompilerServices;
using DotNetCore.CAP.Abstractions.ModelBinding;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Internal
{
/// <summary>
/// A factory for <see cref="IModelBinder"/> instances.
/// </summary>
public class ModelBinderFactory : IModelBinderFactory
{
private readonly ConcurrentDictionary<Key, IModelBinder> _cache =
new ConcurrentDictionary<Key, IModelBinder>();

public IModelBinder CreateBinder(ParameterInfo parameter)
{
if (parameter == null)
{
throw new ArgumentNullException(nameof(parameter));
}

object token = parameter;

var binder = CreateBinderCoreCached(parameter, token);
if (binder == null)
{
throw new InvalidOperationException("Format Could Not Create IModelBinder");
}

return binder;
}

private IModelBinder CreateBinderCoreCached(ParameterInfo parameterInfo, object token)
{
IModelBinder binder;
if (TryGetCachedBinder(parameterInfo, token, out binder))
{
return binder;
}
if (!Helper.IsComplexType(parameterInfo.ParameterType))
{
binder = new SimpleTypeModelBinder(parameterInfo);
}
else
{
binder = new ComplexTypeModelBinder(parameterInfo);
}

AddToCache(parameterInfo, token, binder);

return binder;
}

private void AddToCache(ParameterInfo info, object cacheToken, IModelBinder binder)
{
if (cacheToken == null)
{
return;
}

_cache.TryAdd(new Key(info, cacheToken), binder);
}

private bool TryGetCachedBinder(ParameterInfo info, object cacheToken, out IModelBinder binder)
{
if (cacheToken == null)
{
binder = null;
return false;
}

return _cache.TryGetValue(new Key(info, cacheToken), out binder);
}

private struct Key : IEquatable<Key>
{
private readonly ParameterInfo _metadata;
private readonly object _token;

public Key(ParameterInfo metadata, object token)
{
_metadata = metadata;
_token = token;
}

public bool Equals(Key other)
{
return _metadata.Equals(other._metadata) && object.ReferenceEquals(_token, other._token);
}

public override bool Equals(object obj)
{
var other = obj as Key?;
return other.HasValue && Equals(other.Value);
}

public override int GetHashCode()
{
var hash = new HashCodeCombiner();
hash.Add(_metadata);
hash.Add(RuntimeHelpers.GetHashCode(_token));
return hash;
}

public override string ToString()
{
return $"{_token} (Property: '{_metadata.Name}' Type: '{_metadata.ParameterType.Name}')";
}
}
}
}

+ 14
- 2
src/DotNetCore.CAP/LoggerExtensions.cs Datei anzeigen

@@ -14,6 +14,7 @@ namespace DotNetCore.CAP
private static readonly Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static readonly Action<ILogger, string, Exception> _executingConsumerMethod;
private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting;
private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException;

private static Action<ILogger, Exception> _jobFailed;
private static Action<ILogger, Exception> _jobFailedWillRetry;
@@ -46,12 +47,12 @@ namespace DotNetCore.CAP
_enqueuingSentMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
"Enqueuing a topic to the sent message store. NameKey: {NameKey}. Content: {Content}");
"Enqueuing a topic to the sent message store. NameKey: '{NameKey}' Content: '{Content}'.");

_enqueuingReceivdeMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
"Enqueuing a topic to the received message store. NameKey: {NameKey}. Content: {Content}");
"Enqueuing a topic to the received message store. NameKey: '{NameKey}. Content: '{Content}'.");

_executingConsumerMethod = LoggerMessage.Define<string>(
LogLevel.Error,
@@ -63,6 +64,12 @@ namespace DotNetCore.CAP
5,
"Received message topic method '{topicName}' failed to execute.");

_modelBinderFormattingException = LoggerMessage.Define<string, string, string>(
LogLevel.Error,
5,
"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{MethodName}' ParameterName:'{ParameterName}' Content:'{Content}'."
);

_jobRetrying = LoggerMessage.Define<int>(
LogLevel.Debug,
3,
@@ -154,5 +161,10 @@ namespace DotNetCore.CAP
{
_exceptionOccuredWhileExecutingJob(logger, jobId, ex);
}

public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName, string content, Exception ex)
{
_modelBinderFormattingException(logger, methodName, parameterName, content, ex);
}
}
}

+ 0
- 1
src/DotNetCore.CAP/Models/CapPublishedMessage.cs Datei anzeigen

@@ -1,5 +1,4 @@
using System;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Models
{


+ 0
- 1
src/DotNetCore.CAP/Models/CapReceivedMessage.cs Datei anzeigen

@@ -1,5 +1,4 @@
using System;
using DotNetCore.CAP.Infrastructure;

namespace DotNetCore.CAP.Models
{


+ 1
- 1
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs Datei anzeigen

@@ -49,7 +49,7 @@ namespace DotNetCore.CAP.Processor
try
{
var worked = await Step(context);
context.ThrowIfStopping();

Waiting = true;


+ 4
- 3
src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs Datei anzeigen

@@ -39,8 +39,9 @@ namespace DotNetCore.CAP.Processor

public void Start()
{
var processorCount = Environment.ProcessorCount;
var processorCount = _options.QueueProcessorCount;
_processors = GetProcessors(processorCount);

_logger.ServerStarting(processorCount, _processors.Length);

_context = new ProcessingContext(_provider, _cts.Token);
@@ -61,7 +62,7 @@ namespace DotNetCore.CAP.Processor

_logger.LogTrace("Pulsing the Queuer.");

PublishQueuer.PulseEvent.Set();
PublishQueuer.PulseEvent.Set();
}

public void Dispose()
@@ -76,7 +77,7 @@ namespace DotNetCore.CAP.Processor
_cts.Cancel();
try
{
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
_compositeTask.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
}
catch (AggregateException ex)
{


+ 38
- 5
src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs Datei anzeigen

@@ -1,7 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
@@ -30,7 +27,7 @@ namespace DotNetCore.CAP.Processor
_logger = logger;
_provider = provider;
_stateChanger = stateChanger;
_waitingInterval = _options.FailedMessageWaitingInterval;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedMessageWaitingInterval);
}

public async Task ProcessAsync(ProcessingContext context)
@@ -56,14 +53,32 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessPublishedAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedPublishedMessages();
var hasException = false;

foreach (var message in messages)
{
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(Models.MessageType.Publish, message.Name, message.Content);

}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}

using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}

context.ThrowIfStopping();

await context.WaitAsync(_delay);
}
}
@@ -71,16 +86,34 @@ namespace DotNetCore.CAP.Processor
private async Task ProcessReceivededAsync(IStorageConnection connection, ProcessingContext context)
{
var messages = await connection.GetFailedReceviedMessages();
var hasException = false;

foreach (var message in messages)
{
if (!hasException)
{
try
{
_options.FailedCallback?.Invoke(Models.MessageType.Subscribe, message.Name, message.Content);

}
catch (Exception ex)
{
hasException = true;
_logger.LogWarning("Failed call-back method raised an exception:" + ex.Message);
}
}

using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
await transaction.CommitAsync();
}

context.ThrowIfStopping();

await context.WaitAsync(_delay);
}
}
}
}
}

+ 2
- 2
src/DotNetCore.CAP/QueueExecutorFactory.cs Datei anzeigen

@@ -18,8 +18,8 @@ namespace DotNetCore.CAP
{
var queueExectors = _serviceProvider.GetServices<IQueueExecutor>();

return messageType == MessageType.Publish
? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor)
return messageType == MessageType.Publish
? queueExectors.FirstOrDefault(x => x is BasePublishQueueExecutor)
: queueExectors.FirstOrDefault(x => !(x is BasePublishQueueExecutor));
}
}

+ 47
- 0
test/DotNetCore.CAP.MySql.Test/ConnectionUtil.cs Datei anzeigen

@@ -0,0 +1,47 @@
using System;
using MySql.Data.MySqlClient;

namespace DotNetCore.CAP.MySql.Test
{
public static class ConnectionUtil
{
private const string DatabaseVariable = "Cap_MySql_DatabaseName";
private const string ConnectionStringTemplateVariable = "Cap_MySql_ConnectionStringTemplate";

private const string MasterDatabaseName = "information_schema";
private const string DefaultDatabaseName = @"DotNetCore.CAP.MySql.Test";

private const string DefaultConnectionStringTemplate =
@"Server=localhost;Database={0};Uid=root;Pwd=123123;";

public static string GetDatabaseName()
{
return Environment.GetEnvironmentVariable(DatabaseVariable) ?? DefaultDatabaseName;
}

public static string GetMasterConnectionString()
{
return string.Format(GetConnectionStringTemplate(), MasterDatabaseName);
}

public static string GetConnectionString()
{
return string.Format(GetConnectionStringTemplate(), GetDatabaseName());
}

private static string GetConnectionStringTemplate()
{
return
Environment.GetEnvironmentVariable(ConnectionStringTemplateVariable) ??
DefaultConnectionStringTemplate;
}

public static MySqlConnection CreateConnection(string connectionString = null)
{
connectionString = connectionString ?? GetConnectionString();
var connection = new MySqlConnection(connectionString);
connection.Open();
return connection;
}
}
}

+ 68
- 0
test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs Datei anzeigen

@@ -0,0 +1,68 @@
using System.Data;
using System.Threading;
using Dapper;
using Microsoft.EntityFrameworkCore;

namespace DotNetCore.CAP.MySql.Test
{
public abstract class DatabaseTestHost : TestHost
{
private static bool _sqlObjectInstalled;
public static object _lock = new object();

protected override void PostBuildServices()
{
base.PostBuildServices();
lock (_lock)
{
if (!_sqlObjectInstalled)
{
InitializeDatabase();
}
}
}

public override void Dispose()
{
DeleteAllData();
base.Dispose();
}

private void InitializeDatabase()
{
using (CreateScope())
{
var storage = GetService<MySqlStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}

private void CreateDatabase()
{
var masterConn = ConnectionUtil.GetMasterConnectionString();
var databaseName = ConnectionUtil.GetDatabaseName();
using (var connection = ConnectionUtil.CreateConnection(masterConn))
{
connection.Execute($@"
DROP DATABASE IF EXISTS `{databaseName}`;
CREATE DATABASE `{databaseName}`;");
}
}

private void DeleteAllData()
{
var conn = ConnectionUtil.GetConnectionString();

using (var connection = ConnectionUtil.CreateConnection(conn))
{
connection.Execute($@"
TRUNCATE TABLE `cap.published`;
TRUNCATE TABLE `cap.received`;
TRUNCATE TABLE `cap.queue`;");
}
}
}
}

+ 46
- 0
test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj Datei anzeigen

@@ -0,0 +1,46 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp1.1</TargetFramework>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyName>DotNetCore.CAP.MySql.Test</AssemblyName>
<PackageId>DotNetCore.CAP.MySql.Test</PackageId>
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
<PackageTargetFallback>$(PackageTargetFallback);dnxcore50;portable-net451+win8</PackageTargetFallback>
<RuntimeFrameworkVersion>1.1.1</RuntimeFrameworkVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Shared\*.cs" Exclude="bin\**;obj\**;**\*.xproj;packages\**" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.MySql\DotNetCore.CAP.MySql.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.0.0" />
<PackageReference Include="MySqlConnector" Version="0.24.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" />
<PackageReference Include="xunit" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="1.1.1" />
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="1.1.2" />
</ItemGroup>
<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>

</Project>

+ 134
- 0
test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs Datei anzeigen

@@ -0,0 +1,134 @@
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Xunit;

namespace DotNetCore.CAP.MySql.Test
{
[Collection("MySql")]
public class MySqlStorageConnectionTest : DatabaseTestHost
{
private MySqlStorageConnection _storage;

public MySqlStorageConnectionTest()
{
var options = GetService<MySqlOptions>();
_storage = new MySqlStorageConnection(options);
}

[Fact]
public async Task GetPublishedMessageAsync_Test()
{
var sql = "INSERT INTO `cap.published`(`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
var publishMessage = new CapPublishedMessage
{
Name = "MySqlStorageConnectionTest",
Content = "",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, publishMessage);
}
var message = await _storage.GetPublishedMessageAsync(insertedId);
Assert.NotNull(message);
Assert.Equal("MySqlStorageConnectionTest", message.Name);
Assert.Equal(StatusName.Scheduled, message.StatusName);
}

[Fact]
public async Task FetchNextMessageAsync_Test()
{
var sql = "INSERT INTO `Cap.Queue`(`MessageId`,`MessageType`) VALUES(@MessageId,@MessageType);";
var queue = new CapQueue
{
MessageId = 3333,
MessageType = MessageType.Publish
};
using (var connection = ConnectionUtil.CreateConnection())
{
connection.Execute(sql, queue);
}
var fetchedMessage = await _storage.FetchNextMessageAsync();
fetchedMessage.Dispose();
Assert.NotNull(fetchedMessage);
Assert.Equal(MessageType.Publish, fetchedMessage.MessageType);
Assert.Equal(3333, fetchedMessage.MessageId);
}

[Fact]
public async Task StoreReceivedMessageAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Name = "MySqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};

Exception exception = null;
try
{
await _storage.StoreReceivedMessageAsync(receivedMessage);
}
catch (Exception ex)
{
exception = ex;
}
Assert.Null(exception);
}

[Fact]
public async Task GetReceivedMessageAsync_Test()
{

var sql = $@"
INSERT INTO `cap.received`(`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;";
var receivedMessage = new CapReceivedMessage
{
Name = "MySqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
var insertedId = default(int);
using (var connection = ConnectionUtil.CreateConnection())
{
insertedId = connection.QueryFirst<int>(sql, receivedMessage);
}

var message = await _storage.GetReceivedMessageAsync(insertedId);

Assert.NotNull(message);
Assert.Equal(StatusName.Scheduled, message.StatusName);
Assert.Equal("MySqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}

[Fact]
public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test()
{
var receivedMessage = new CapReceivedMessage
{
Name = "MySqlStorageConnectionTest",
Content = "",
Group = "mygroup",
StatusName = StatusName.Scheduled
};
await _storage.StoreReceivedMessageAsync(receivedMessage);

var message = await _storage.GetNextReceviedMessageToBeEnqueuedAsync();

Assert.NotNull(message);
Assert.Equal(StatusName.Scheduled, message.StatusName);
Assert.Equal("MySqlStorageConnectionTest", message.Name);
Assert.Equal("mygroup", message.Group);
}

}
}

+ 71
- 0
test/DotNetCore.CAP.MySql.Test/MySqlStorageTest.cs Datei anzeigen

@@ -0,0 +1,71 @@
using Xunit;
using Dapper;

namespace DotNetCore.CAP.MySql.Test
{
[Collection("MySql")]
public class MySqlStorageTest : DatabaseTestHost
{
private readonly string _dbName;
private readonly string _masterDbConnectionString;


public MySqlStorageTest()
{
_dbName = ConnectionUtil.GetDatabaseName();
_masterDbConnectionString = ConnectionUtil.GetMasterConnectionString();
}

[Fact]
public void Database_IsExists()
{
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var databaseName = ConnectionUtil.GetDatabaseName();
var sql = $@"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME = '{databaseName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.True(databaseName.Equals(result, System.StringComparison.CurrentCultureIgnoreCase));
}
}

[Fact]
public void DatabaseTable_Published_IsExists()
{
var tableName = "cap.published";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}

[Fact]
public void DatabaseTable_Queue_IsExists()
{
var tableName = "cap.queue";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}

[Fact]
public void DatabaseTable_Received_IsExists()
{
var tableName = "cap.received";
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString))
{
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'";
var result = connection.QueryFirstOrDefault<string>(sql);
Assert.NotNull(result);
Assert.Equal(tableName, result);
}
}
}
}

+ 98
- 0
test/DotNetCore.CAP.MySql.Test/TestHost.cs Datei anzeigen

@@ -0,0 +1,98 @@
using System;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.MySql.Test
{
public abstract class TestHost : IDisposable
{
protected IServiceCollection _services;
protected string _connectionString;
private IServiceProvider _provider;
private IServiceProvider _scopedProvider;

public TestHost()
{
CreateServiceCollection();
PreBuildServices();
BuildServices();
PostBuildServices();
}

protected IServiceProvider Provider => _scopedProvider ?? _provider;

private void CreateServiceCollection()
{
var services = new ServiceCollection();

services.AddOptions();
services.AddLogging();

_connectionString = ConnectionUtil.GetConnectionString();
services.AddSingleton(new MySqlOptions { ConnectionString = _connectionString });
services.AddSingleton<MySqlStorage>();

_services = services;
}

protected virtual void PreBuildServices()
{
}

private void BuildServices()
{
_provider = _services.BuildServiceProvider();
}

protected virtual void PostBuildServices()
{
}

public IDisposable CreateScope()
{
var scope = CreateScope(_provider);
var loc = scope.ServiceProvider;
_scopedProvider = loc;
return new DelegateDisposable(() =>
{
if (_scopedProvider == loc)
{
_scopedProvider = null;
}
scope.Dispose();
});
}

public IServiceScope CreateScope(IServiceProvider provider)
{
var scope = provider.GetService<IServiceScopeFactory>().CreateScope();
return scope;
}

public T GetService<T>() => Provider.GetService<T>();

public T Ensure<T>(ref T service)
where T : class
=> service ?? (service = GetService<T>());

public virtual void Dispose()
{
(_provider as IDisposable)?.Dispose();
}

private class DelegateDisposable : IDisposable
{
private Action _dispose;

public DelegateDisposable(Action dispose)
{
_dispose = dispose;
}

public void Dispose()
{
_dispose();
}
}
}
}

+ 1
- 1
test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs Datei anzeigen

@@ -35,7 +35,7 @@ namespace DotNetCore.CAP.SqlServer.Test
var storage = GetService<SqlServerStorage>();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).Wait();
storage.InitializeAsync(token).GetAwaiter().GetResult();
_sqlObjectInstalled = true;
}
}


Laden…
Abbrechen
Speichern