Browse Source

Merge branch 'master' into supports/nats

# Conflicts:
#	CAP.sln
#	CAP.sln.DotSettings
#	src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
#	src/DotNetCore.CAP/Transport/MqLogType.cs
master
Savorboard 3 years ago
parent
commit
607a99025c
100 changed files with 1490 additions and 373 deletions
  1. +1
    -1
      .travis.yml
  2. +13
    -13
      CAP.sln
  3. +1
    -0
      CAP.sln.DotSettings
  4. +35
    -18
      README.md
  5. +1
    -1
      README.zh-cn.md
  6. +3
    -3
      appveyor.yml
  7. +1
    -1
      build/BuildScript.csproj
  8. +2
    -2
      build/version.props
  9. +1
    -1
      docs/content/about/license.md
  10. +122
    -0
      docs/content/about/release-notes.md
  11. BIN
     
  12. BIN
     
  13. +4
    -4
      docs/content/index.md
  14. +19
    -19
      docs/content/user-guide/en/cap/configuration.md
  15. +7
    -7
      docs/content/user-guide/en/cap/idempotence.md
  16. +96
    -11
      docs/content/user-guide/en/cap/messaging.md
  17. +5
    -5
      docs/content/user-guide/en/cap/serialization.md
  18. +8
    -8
      docs/content/user-guide/en/cap/transactions.md
  19. +8
    -4
      docs/content/user-guide/en/getting-started/introduction.md
  20. +41
    -1
      docs/content/user-guide/en/monitoring/consul.md
  21. +4
    -4
      docs/content/user-guide/en/monitoring/dashboard.md
  22. +24
    -2
      docs/content/user-guide/en/monitoring/diagnostics.md
  23. +0
    -3
      docs/content/user-guide/en/monitoring/metrics.md
  24. +0
    -34
      docs/content/user-guide/en/persistent/in-memory-storage.md
  25. +1
    -1
      docs/content/user-guide/en/samples/faq.md
  26. +1
    -1
      docs/content/user-guide/en/samples/github.md
  27. +18
    -7
      docs/content/user-guide/en/storage/general.md
  28. +34
    -0
      docs/content/user-guide/en/storage/in-memory-storage.md
  29. +2
    -2
      docs/content/user-guide/en/storage/mongodb.md
  30. +2
    -2
      docs/content/user-guide/en/storage/mysql.md
  31. +2
    -2
      docs/content/user-guide/en/storage/postgresql.md
  32. +6
    -3
      docs/content/user-guide/en/storage/sqlserver.md
  33. +90
    -0
      docs/content/user-guide/en/transport/aws-sqs.md
  34. +4
    -6
      docs/content/user-guide/en/transport/azure-service-bus.md
  35. +13
    -0
      docs/content/user-guide/en/transport/general.md
  36. +3
    -2
      docs/content/user-guide/en/transport/in-memory-queue.md
  37. +6
    -6
      docs/content/user-guide/en/transport/kafka.md
  38. +11
    -3
      docs/content/user-guide/en/transport/rabbitmq.md
  39. +46
    -3
      docs/content/user-guide/zh/cap/messaging.md
  40. +2
    -2
      docs/content/user-guide/zh/cap/serialization.md
  41. +4
    -0
      docs/content/user-guide/zh/getting-started/introduction.md
  42. +41
    -1
      docs/content/user-guide/zh/monitoring/consul.md
  43. +3
    -3
      docs/content/user-guide/zh/monitoring/diagnostics.md
  44. +0
    -3
      docs/content/user-guide/zh/monitoring/metrics.md
  45. +7
    -1
      docs/content/user-guide/zh/storage/general.md
  46. +0
    -0
      docs/content/user-guide/zh/storage/in-memory-storage.md
  47. +0
    -0
      docs/content/user-guide/zh/storage/mongodb.md
  48. +0
    -0
      docs/content/user-guide/zh/storage/mysql.md
  49. +0
    -0
      docs/content/user-guide/zh/storage/postgresql.md
  50. +4
    -1
      docs/content/user-guide/zh/storage/sqlserver.md
  51. +91
    -0
      docs/content/user-guide/zh/transport/aws-sqs.md
  52. +0
    -0
      docs/content/user-guide/zh/transport/azure-service-bus.md
  53. +13
    -2
      docs/content/user-guide/zh/transport/general.md
  54. +0
    -0
      docs/content/user-guide/zh/transport/in-memory-queue.md
  55. +0
    -0
      docs/content/user-guide/zh/transport/kafka.md
  56. +8
    -0
      docs/content/user-guide/zh/transport/rabbitmq.md
  57. +36
    -36
      docs/mkdocs.yml
  58. +4
    -4
      samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs
  59. +1
    -1
      samples/Sample.AmazonSQS.InMemory/Program.cs
  60. +2
    -2
      samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj
  61. +4
    -3
      samples/Sample.AmazonSQS.InMemory/Startup.cs
  62. +1
    -1
      samples/Sample.AmazonSQS.InMemory/appsettings.json
  63. +1
    -1
      samples/Sample.ConsoleApp/Program.cs
  64. +2
    -2
      samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj
  65. +2
    -2
      samples/Sample.Kafka.PostgreSql/Sample.Kafka.PostgreSql.csproj
  66. +1
    -1
      samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj
  67. +1
    -1
      samples/Sample.RabbitMQ.MongoDB/Startup.cs
  68. +2
    -2
      samples/Sample.RabbitMQ.MySql/AppDbContext.cs
  69. +1
    -1
      samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs
  70. +3
    -3
      samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
  71. +2
    -14
      samples/Sample.RabbitMQ.SqlServer/AppDbContext.cs
  72. +18
    -8
      samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs
  73. +4
    -4
      samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj
  74. +1
    -1
      samples/Sample.RabbitMQ.SqlServer/Startup.cs
  75. +3
    -2
      src/Directory.Build.props
  76. +212
    -0
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
  77. +31
    -0
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs
  78. +17
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs
  79. +30
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs
  80. +30
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs
  81. +23
    -0
      src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj
  82. +125
    -0
      src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs
  83. +18
    -0
      src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs
  84. +16
    -0
      src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs
  85. +4
    -4
      src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj
  86. +3
    -2
      src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs
  87. +6
    -3
      src/DotNetCore.CAP.Dashboard/CAP.DashboardMiddleware.cs
  88. +2
    -1
      src/DotNetCore.CAP.Dashboard/CAP.DashboardOptionsExtensions.cs
  89. +8
    -8
      src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs
  90. +2
    -4
      src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj
  91. +4
    -17
      src/DotNetCore.CAP.Dashboard/JsonDispatcher.cs
  92. +3
    -16
      src/DotNetCore.CAP.Dashboard/JsonStats.cs
  93. +6
    -0
      src/DotNetCore.CAP.Dashboard/NodeDiscovery/CAP.DiscoveryOptions.cs
  94. +14
    -9
      src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.Consul.cs
  95. +5
    -5
      src/DotNetCore.CAP.Dashboard/Pages/HomePage.cshtml
  96. +5
    -5
      src/DotNetCore.CAP.Dashboard/Pages/HomePage.generated.cs
  97. +1
    -1
      src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml
  98. +1
    -1
      src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs
  99. +2
    -2
      src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj
  100. +25
    -13
      src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs

+ 1
- 1
.travis.yml View File

@@ -2,7 +2,7 @@ language: csharp
sudo: required
dist: xenial
solution: CAP.sln
dotnet: 3.1.100
dotnet: 5.0.100
mono: none
env:
- Cap_MySql_ConnectionString="Server=127.0.0.1;Database=cap_test;Uid=root;Pwd=;Allow User Variables=True;SslMode=none"


+ 13
- 13
CAP.sln View File

@@ -51,8 +51,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureService
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Dashboard", "src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj", "{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.InMemory", "samples\Sample.Kafka.InMemory\Sample.Kafka.InMemory.csproj", "{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{F6C5C676-AF05-46D5-A45D-442137B31898}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.PostgreSql", "samples\Sample.Kafka.PostgreSql\Sample.Kafka.PostgreSql.csproj", "{F1EF1D26-8A6B-403E-85B0-250DF44A4A7C}"
@@ -65,7 +63,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{25A1B3A1-DD74-436C-9956-17E04FE7643D}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory", "samples\Sample.AmazonSQS.InMemory\Sample.AmazonSQS.InMemory.csproj", "{B187DD15-092D-4B72-9807-50856607D237}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -121,10 +121,6 @@ Global
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.Build.0 = Release|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.Build.0 = Release|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -149,10 +145,14 @@ Global
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.Build.0 = Release|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -170,14 +170,14 @@ Global
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F6C5C676-AF05-46D5-A45D-442137B31898} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F1EF1D26-8A6B-403E-85B0-250DF44A4A7C} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F8EF381A-FE83-40B3-A63D-09D83851B0FB} = {10C0818D-9160-4B80-BB86-DDE925B64D43}
{93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{25A1B3A1-DD74-436C-9956-17E04FE7643D} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 1
- 0
CAP.sln.DotSettings View File

@@ -1,6 +1,7 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=DB/@EntryIndexedValue">DB</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=NATS/@EntryIndexedValue">NATS</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SNS/@EntryIndexedValue">SNS</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Mongo/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NATS/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Postgre/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

+ 35
- 18
README.md View File

@@ -2,7 +2,7 @@
<img height="140" src="https://cap.dotnetcore.xyz/img/logo.svg">
</p>

# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md)
# CAP                     [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md)
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/v8gfh6pe2u2laqoa/branch/master?svg=true)](https://ci.appveyor.com/project/yang-xiaodong/cap/branch/master)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
@@ -10,11 +10,11 @@
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)

CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently.
CAP is a library based on .Net standard, which is a solution to deal with distributed transactions, has the function of EventBus, it is lightweight, easy to use, and efficient.

In the process of building an SOA or MicroService system, we usually need to use the event to integrate each services. In the process, the simple use of message queue does not guarantee the reliability. CAP is adopted the local message table program integrated with the current database to solve the exception may occur in the process of the distributed system calling each other. It can ensure that the event messages are not lost in any case.
In the process of building an SOA or MicroService system, we usually need to use the event to integrate each service. In the process, simple use of message queue does not guarantee reliability. CAP adopts local message table program integrated with the current database to solve exceptions that may occur in the process of the distributed system calling each other. It can ensure that the event messages are not lost in any case.

You can also use the CAP as an EventBus. The CAP provides a simpler way to implement event publishing and subscriptions. You do not need to inherit or implement any interface during the process of subscription and sending.
You can also use CAP as an EventBus. CAP provides a simpler way to implement event publishing and subscriptions. You do not need to inherit or implement any interface during subscription and sending process.

## Architecture overview

@@ -26,18 +26,19 @@ You can also use the CAP as an EventBus. The CAP provides a simpler way to imple

### NuGet

You can run the following command to install the CAP in your project.
CAP can be installed in your project with the following command.

```
PM> Install-Package DotNetCore.CAP
```

CAP supports RabbitMQ,Kafka and AzureService as message queue, select the packages you need to install:
CAP supports RabbitMQ, Kafka, AzureService, AmazonSQS as message queue, following packages are available to install:

```
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS
```

CAP supports SqlServer, MySql, PostgreSql,MongoDB as event log storage.
@@ -53,7 +54,7 @@ PM> Install-Package DotNetCore.CAP.MongoDB //need MongoDB 4.0+ cluster

### Configuration

First,You need to config CAP in your Startup.cs:
First, you need to configure CAP in your Startup.cs:

```cs
public void ConfigureServices(IServiceCollection services)
@@ -80,6 +81,7 @@ public void ConfigureServices(IServiceCollection services)
x.UseRabbitMQ("ConnectionString");
x.UseKafka("ConnectionString");
x.UseAzureServiceBus("ConnectionString");
x.UseAmazonSQS();
});
}

@@ -87,7 +89,7 @@ public void ConfigureServices(IServiceCollection services)

### Publish

Inject `ICapPublisher` in your Controller, then use the `ICapPublisher` to send message
Inject `ICapPublisher` in your Controller, then use the `ICapPublisher` to send messages

```c#
public class PublishController : Controller
@@ -135,7 +137,7 @@ public class PublishController : Controller

**In Controller Action**

Add the Attribute `[CapSubscribe()]` on Action to subscribe message:
Add the Attribute `[CapSubscribe()]` on Action to subscribe to messages:

```c#
public class PublishController : Controller
@@ -151,7 +153,7 @@ public class PublishController : Controller

**In Business Logic Service**

If your subscribe method is not in the Controller,then your subscribe class need to Inheritance `ICapSubscribe`:
If your subscription method is not in the Controller, then your subscribe class needs to implement `ICapSubscribe` interface:

```c#

@@ -159,7 +161,7 @@ namespace BusinessCode.Service
{
public interface ISubscriberService
{
public void CheckReceivedMessage(DateTime datetime);
void CheckReceivedMessage(DateTime datetime);
}

public class SubscriberService: ISubscriberService, ICapSubscribe
@@ -173,7 +175,7 @@ namespace BusinessCode.Service

```

Then inject your `ISubscriberService` class in Startup.cs
Then register your class that implements `ISubscriberService` in Startup.cs

```c#
public void ConfigureServices(IServiceCollection services)
@@ -187,15 +189,30 @@ public void ConfigureServices(IServiceCollection services)
});
}
```
#### Use partials for topic subscriptions

To group topic subscriptions on class level you're able to define a subscription on a method as a partial. Subscriptions on the message queue will then be a combination of the topic defined on the class and the topic defined on the method. In the following example the `Create(..)` function will be invoked when receiving a message on `customers.create`

```c#
[CapSubscribe("customers")]
public class CustomersSubscriberService : ICapSubscribe
{
[CapSubscribe("create", isPartial: true)]
public void Create(Customer customer)
{
}
}
```


#### Subscribe Group

The concept of a subscription group is similar to that of a consumer group in Kafka. it is the same as the broadcast mode in the message queue, which is used to process the same message between multiple different microservice instances.

When CAP startup, it will use the current assembly name as the default group name, if multiple same group subscribers subscribe the same topic name, there is only one subscriber can receive the message.
When CAP startups, it will use the current assembly name as the default group name, if multiple same group subscribers subscribe to the same topic name, there is only one subscriber that can receive the message.
Conversely, if subscribers are in different groups, they will all receive messages.

In the same application, you can specify the `Group` property to keep they are in different subscribe groups:
In the same application, you can specify `Group` property to keep subscriptions in different subscribe groups:

```C#

@@ -212,7 +229,7 @@ public void ShowTime2(DateTime datetime)
```
`ShowTime1` and `ShowTime2` will be called at the same time.

BTW, You can specify the default group name in the configuration :
BTW, You can specify the default group name in the configuration:

```C#
services.AddCap(x =>
@@ -224,13 +241,13 @@ services.AddCap(x =>

### Dashboard

CAP v2.1+ provides the dashboard pages, you can easily view the sent and received messages. In addition, you can also view the message status in real time on the dashboard. Use the following command to install the Dashboard in your project.
CAP v2.1+ provides dashboard pages, you can easily view messages that were sent and received. In addition, you can also view the message status in real time in the dashboard. Use the following command to install the Dashboard in your project.

```
PM> Install-Package DotNetCore.CAP.Dashboard
```

In the distributed environment, the dashboard built-in integrated [Consul](http://consul.io) as a node discovery, while the realization of the gateway agent function, you can also easily view the node or other node data, It's like you are visiting local resources.
In the distributed environment, the dashboard built-in integrates [Consul](http://consul.io) as a node discovery, while the realization of the gateway agent function, you can also easily view the node or other node data, It's like you are visiting local resources.

```c#
services.AddCap(x =>
@@ -253,7 +270,7 @@ services.AddCap(x =>
});
```

The default dashboard address is :[http://localhost:xxx/cap](http://localhost:xxx/cap), you can also configure the `/cap` suffix with `x.UseDashboard(opt =>{ opt.MatchPath="/mycap"; })`.
The default dashboard address is :[http://localhost:xxx/cap](http://localhost:xxx/cap), you can configure relative path `/cap` with `x.UseDashboard(opt =>{ opt.MatchPath="/mycap"; })`.

![dashboard](http://images2017.cnblogs.com/blog/250417/201710/250417-20171004220827302-189215107.png)



+ 1
- 1
README.zh-cn.md View File

@@ -174,7 +174,7 @@ namespace xxx.Service
{
public interface ISubscriberService
{
public void CheckReceivedMessage(DateTime datetime);
void CheckReceivedMessage(DateTime datetime);
}

public class SubscriberService: ISubscriberService, ICapSubscribe


+ 3
- 3
appveyor.yml View File

@@ -12,12 +12,12 @@ build_script:
- ps: flubu
test: off
artifacts:
- path: artifacts/*.nupkg
- path: artifacts/**
deploy:
provider: NuGet
on:
appveyor_repo_tag: true
api_key:
secure: PZXRBOGLyhYLP7ulHfrh6MnkqB8CstuitgbLcJr3cZkLJLLzPH0ahvuTtmhWxtR2
skip_symbols: true
artifact: /artifacts\/.+\.nupkg/
skip_symbols: false
artifact: /artifacts\/.+\.s?nupkg/

+ 1
- 1
build/BuildScript.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>


+ 2
- 2
build/version.props View File

@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<VersionMajor>3</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionMajor>5</VersionMajor>
<VersionMinor>0</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>


+ 1
- 1
docs/content/about/license.md View File

@@ -2,7 +2,7 @@

**MIT License**

Copyright (c) 2016 - 2019 Savorboard
Copyright (c) 2016 - 2020 Savorboard

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal


+ 122
- 0
docs/content/about/release-notes.md View File

@@ -1,5 +1,127 @@
# Release Notes

## Version 3.1.1 (2020-09-23)

**Features:**

* Add consumer parameter with interface suppport. (#669)
* Add custom correlation id and message id support. (#668)
* Enhanced custom serialization support. (#641)

**Bug Fixed:**

* Solve the issue of being duplicated executors from different assemblies. (#666)
* Added comparer to remove duplicate ConsumerExecutors. (#653)
* Add re-enable the auto create topics configuration item for Kafka, it's false by default. now is true. (#635)
* Fixed postgresql transaction rollback invoke bug. (#640)
* Fixed SQLServer table name customize bug. (#632)

## Version 3.1.0 (2020-08-15)

**Features:**

* Add Amazon SQS support. (#597)
* Remove Dapper and replace with ADO.NET in storage project. (#583)
* Add debug symbols package to nuget.
* Upgrade dependent nuget package version to latest.
* English docs grammar correction. Thanks @mzorec

**Bug Fixed:**

* Fix mysql transaction rollback bug. (#598)
* Fix dashboard query bug. (#600)
* Fix mongo db query bug. (#611)
* Fix dashboard browser language detection bug. (#631)

## Version 3.0.4 (2020-05-27)

**Bug Fixed:**

* Fix kafka consumer group does not works bug. (#541)
* Fix cast object to primitive types failed bug. (#547)
* Fix subscriber primitive types convert exception. (#568)
* Add conosole app sample.
* Upgrade Confluent.Kafka to 1.4.3


## Version 3.0.3 (2020-04-01)

**Bug Fixed:**

* Change ISubscribeInvoker interface access modifier to public. (#537)
* Fix rabbitmq connection may be reused when close forced. (#533)
* Fix dahsboard message reexecute button throws exception bug. (#525)

## Version 3.0.2 (2020-02-05)

**Bug Fixed:**

- Fixed diagnostics event data object error. (#504 )
- Fixed RabbitMQ transport check not working. (#503 )
- Fixed Azure Service Bus subscriber error. (#502 )

## Version 3.0.1 (2020-01-19)

**Bug Fixed:**

* Fixed Dashboard requeue and reconsume failed bug. (#482 )
* Fixed Azure service bus null reference exception. (#483 )
* Fixed type cast exception from storage. (#473 )
* Fixed SqlServer connection undisponse bug. (#477 )

## Version 3.0.0 (2019-12-30)

**Breaking Changes:**

In this version, we have made major improvements to the code structure, which have introduced some destructive changes.

* Publisher and Consumer are not compatible with older versions
This version is not compatible with older versions of the message protocol because we have improved the format in which messages are published and stored.

* Interface changes
We have done a lot of refactoring of the code, and some of the interfaces may be incompatible with older versions

* Detach the dashboard project

**Features:**

* Supports .NET Core 3.1.
* Upgrade dependent packages.
* New serialization interface `ISerializer` to support serialization of message body sent to MQ.
* Add new api for `ICapPublisher` to publish message with headers.
* Diagnostics event structure and names improved. #378
* Support consumer method to read the message headers. #472
* Support rename message storage tables. #435
* Support for Kafka to write such as Offset and Partition to the header. #374
* Improved the processor retry interval time. #444

**Bug Fixed:**

* Fixed SqlServer dashboard sql query bug. #470
* Fixed Kafka health check bug. #436
* Fixed dashboard bugs. #412 #404
* Fixed transaction bug for sql server when using EF. #402


## Version 2.6.0 (2019-08-29)

**Features:**

* Improvement Diagnostic support. Thanks [@gfx687](https://github.com/gfx687)
* Improvement documention. https://cap.dotnetcore.xyz
* Improvement `ConsumerInvoker` implementation. Thanks [@hetaoos](https://github.com/hetaoos)
* Support multiple consumer threads. (#295)
* Change DashboardMiddleware to async. (#390) Thanks [@liuzhenyulive](https://github.com/liuzhenyulive)

**Bug Fixed:**

* SQL Server Options Bug.
* Fix transaction scope disposed bug. (#365)
* Fix thread safe issue of ICapPublisher bug. (#371)
* Improved Ctrl+C action raised exception issue.
* Fixed asynchronous exception catching bug of sending.
* Fix MatchPoundUsingRegex "." not escaped bug (#373)

## Version 2.5.1 (2019-06-21)

**Features:**


BIN
View File


BIN
View File


+ 4
- 4
docs/content/index.md View File

@@ -2,7 +2,7 @@ Title: CAP - A distributed transaction solution in micro-service base on eventua

# CAP

<img height="140" align="right" src="https://cap.dotnetcore.xyz/img/logo.svg">
<img width="140" align="right" src="https://cap.dotnetcore.xyz/img/logo.svg">
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/v8gfh6pe2u2laqoa/branch/master?svg=true)](https://ci.appveyor.com/project/yuleyule66/cap/branch/master)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
@@ -10,13 +10,13 @@ Title: CAP - A distributed transaction solution in micro-service base on eventua
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/dotnetcore/CAP/master/LICENSE.txt)
[![Member project of .NET Core Community](https://img.shields.io/badge/member%20project%20of-NCC-9e20c9.svg)](https://github.com/dotnetcore)

CAP is a library based on .net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficiently.
CAP is a library based on .net standard, which is a solution to deal with distributed transactions, also has the function of EventBus, it is lightweight, easy to use, and efficient.

## Introduction

In the process of building an SOA or MicroService system, we usually need to use the event to integrate each services. In the process, the simple use of message queue does not guarantee the reliability. CAP is adopted the local message table program integrated with the current database to solve the exception may occur in the process of the distributed system calling each other. It can ensure that the event messages are not lost in any case.
In the process of building an SOA or MicroService system, we usually need to use the event to integrate each service. In the process, simple use of message queue does not guarantee reliability. CAP adopts local message table program integrated with the current database to solve exceptions that may occur in the process of the distributed system calling each other. It can ensure that the event messages are not lost in any case.

You can also use the CAP as an EventBus. The CAP provides a simpler way to implement event publishing and subscriptions. You do not need to inherit or implement any interface during the process of subscription and sending.
You can also use CAP as an EventBus. CAP provides a simpler way to implement event publishing and subscriptions. You do not need to inherit or implement any interface during subscription and sending process.

!!! Tip "CAP implements the Outbox Pattern described in the [eShop ebook](https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events#designing-atomicity-and-resiliency-when-publishing-to-the-event-bus)"
<img src="img/architecture-eshop.png">


+ 19
- 19
docs/content/user-guide/en/cap/configuration.md View File

@@ -1,6 +1,6 @@
# Configuration

By default, you can specify the configuration when you register the CAP service into the IoC container for ASP.NET Core project.
By default, you can specify configuration when you register CAP services into the IoC container for ASP.NET Core project.

```c#
services.AddCap(config=>
@@ -9,13 +9,13 @@ services.AddCap(config=>
});
```

The `services` is `IServiceCollection` interface,which is under the `Microsoft.Extensions.DependencyInjection`.
`services` is `IServiceCollection` interface, which can be found in the `Microsoft.Extensions.DependencyInjection` package.

If you don't want to use Microsoft's IoC container, you can view ASP.NET Core documentation [here](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-2.2#default-service-container-replacement) to learn how to replace the default container implementation.
If you don't want to use Microsoft's IoC container, you can take a look at ASP.NET Core documentation [here](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-2.2#default-service-container-replacement) to learn how to replace the default container implementation.

## What is the minimum configuration?
## what is minimum configuration required for CAP

The simplest answer is that at least you have to configure a transport and a storage. If you want to get started quickly you can use the following configuration:
you have to configure at least a transport and a storage. If you want to get started quickly you can use the following configuration:

```C#
services.AddCap(capOptions =>
@@ -25,50 +25,50 @@ services.AddCap(capOptions =>
});
```

For specific transport and storage configuration, you can view the configuration items provided by the specific components in the [Transports](../transports/general.md) section and the [Persistent](../persistent/general.md) section.
For specific transport and storage configuration, you can take a look at the configuration options provided by the specific components in the [Transports](../transport/general.md) section and the [Persistent](../storage/general.md) section.

## Custom configuration

The `CapOptions` is used to store configuration information. By default they have the default values, and sometimes you may need to customize them.
The `CapOptions` is used to store configuration information. By default they have default values, sometimes you may need to customize them.

#### DefaultGroup

> Default: cap.queue.{assembly name}

The default consumer group name, corresponding to different names in different Transports, you can customize this value to customize the names in Transports for easy viewing.
The default consumer group name, corresponds to different names in different Transports, you can customize this value to customize the names in Transports for easy viewing.

!!! info "Mapping"
Map to [Queue Names](https://www.rabbitmq.com/queues.html#names) in RabbitMQ.
Map to [Consumer Group Id](http://kafka.apache.org/documentation/#group.id) in Apache Kafka.
Map to Subscription Name in Azure Service Bus.

#### Version
#### Versioning

> Default: v1

This is a new configuration item introduced in the CAP v2.4 version. It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. The following is its application scenario:
This is a new configuration option introduced in the CAP v2.4 version. It is used to specify a version of a message to isolate messages of different versions of the service. It is often used in A/B testing or multi-service version scenarios. Following are application scenarios that needs versioning:

!!! info "Business Iterative and compatible"
Due to the rapid iteration of services, the data structure of the message is not fixed during each service integration process. Sometimes we add or modify certain data structures to accommodate the newly introduced requirements. If you're a brand new system, there's no problem, but if your system is deployed to a production environment and serves customers, this will cause new features to be incompatible with the old data structure when they go online, and then these changes can cause serious problems. To work around this issue, you can only clean up message queues and persistent messages before starting the application, which is obviously fatal for production environments.
Due to the rapid iteration of services, the data structure of the message is not fixed during each service integration process. Sometimes we add or modify certain data structures to accommodate the newly introduced requirements. If you have a brand new system, there's no problem, but if your system is already deployed to a production environment and serves customers, this will cause new features to be incompatible with the old data structure when they go online, and then these changes can cause serious problems. To work around this issue, you can only clean up message queues and persistent messages before starting the application, which is obviously not acceptable for production environments.

!!! info "Multiple versions of the server"
Sometimes, the server's server needs to provide multiple sets of interfaces to support different versions of the app. The data structures of the same interface and server interaction of these different versions of the app may be different, so usually the server does not provide the same. Routing addresses to adapt to different versions of App calls.
Sometimes, the server's server needs to provide multiple sets of interfaces to support different versions of the app. Data structures of the same interface and server interaction of these different versions of the app may be different, so usually server does not provide the same routing addresses to adapt to different versions of App calls.

!!! info "Using the same persistent table/collection in different instance"
If you want multiple different instance services to use the same database, in versions prior to 2.4, we could isolate database tables for different instances by specifying different table names. That is to say, when configuring the CAP, it is implemented by configuring different table name prefixes.
If you want multiple different instance services to use the same database, in versions prior to 2.4, we could isolate database tables for different instances by specifying different table names. After version 2.4 this can be achived through CAP configuration, by configuring different table name prefixes.

> Check out the blog to learn more about Version feature: https://www.cnblogs.com/savorboard/p/cap-2-4.html
> Check out the blog to learn more about the Versioning feature: https://www.cnblogs.com/savorboard/p/cap-2-4.html

#### FailedRetryInterval

> Default: 60 sec

In the process of message message sent to transport failed, the CAP will be retry to sent. This configuration item is used to configure the interval between each retry.
During the message sending process if message transport fails, CAP will try to send the message again. This configuration option is used to configure the interval between each retry.

In the process of message consumption failed, the CAP will retry to execute. This configuration item is used to configure the interval between each retry.
During the message sending process if consumption method fails, CAP will try to execute the method again. This configuration option is used to configure the interval between each retry.

!!! WARNING "Retry & Interval"
By default, retry will start after **4 minutes** of failure to send or consume, in order to avoid possible problems caused by setting message state delays.
By default if failure occurs on send or consume, retry will start after **4 minutes** in order to avoid possible problems caused by setting message state delays.
Failures in the process of sending and consuming messages will be retried 3 times immediately, and will be retried polling after 3 times, at which point the FailedRetryInterval configuration will take effect.

#### ConsumerThreadCount
@@ -94,10 +94,10 @@ T1 : Message Type
T2 : Message Name
T3 : Message Content

Failure threshold callback. This action is called when the retry reaches the value set by `FailedRetryCount`, and you can receive the notification by specifying this parameter to make a manual intervention. For example, send an email or notify.
Failure threshold callback. This action is called when the retry reaches the value set by `FailedRetryCount`, you can receive notification by specifying this parameter to make a manual intervention. For example, send an email or notification.

#### SucceedMessageExpiredAfter

> Default: 24*3600 sec (1 days)

The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from persistent when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.
The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.

+ 7
- 7
docs/content/user-guide/en/cap/idempotence.md View File

@@ -8,7 +8,7 @@ Imdempotence (which you may read a formal definition of on [Wikipedia](https://e

Before we talk about idempotency, let's talk about the delivery of messages on the consumer side.

Since CAP is not a used MS DTC or other type of 2PC distributed transaction mechanism, there is a problem that at least the message is strictly delivered once. Specifically, in a message-based system, there are three possibilities:
Since CAP doesn't uses MS DTC or other type of 2PC distributed transaction mechanism, there is a problem that the message is strictly delivered at least once. Specifically, in a message-based system, there are three possibilities:

* Exactly Once(*)
* At Most Once
@@ -35,9 +35,9 @@ This type of delivery guarantee can arise from your messaging system and your co
2. Put message back into the queue
```

In the sunshine scenario, this is all well and good – your messages will be received, and work transactions will be committed, and you will be happy.
In the best case scenario, this is all well and good – your messages will be received, and work transactions will be committed, and you will be happy.

However, the sun does not always shine, and stuff tends to fail – especially if you do enough stuff. Consider e.g. what would happen if anything fails after having performed step (1), and then – when you try to execute step (4)/(2) (i.e. put the message back into the queue) – the network was temporarily unavailable, or the message broker restarted, or the host machine decided to reboot because it had installed an update.
However, the sun does not always shine, and stuff tends to fail – especially if you do alot of stuff. Consider e.g. what would happen if anything fails after having performed step (1), and then – when you try to execute step (4)/(2) (i.e. put the message back into the queue) – the network was temporarily unavailable, or the message broker restarted, or the host machine decided to reboot because it had installed an update.

This can be OK if it's what you want, but most things in CAP revolve around the concept of DURABLE messages, i.e. messages whose contents is just as important as the data in your database.

@@ -72,7 +72,7 @@ The fact that the "work transaction" is just a conceptual thing is what makes it

## Idempotence at CAP

In the CAP, the delivery guarantees we use is **At Least Once**.
In CAP, **At Least Once** delivery guarantee is used.

Since we have a temporary storage medium (database table), we may be able to do At Most Once, but in order to strictly guarantee that the message will not be lost, we do not provide related functions or configurations.

@@ -83,9 +83,9 @@ Since we have a temporary storage medium (database table), we may be able to do
There are a lot of reasons why the Consumer method fails. I don't know if the specific scene is blindly retrying or not retrying is an incorrect choice.
For example, if the consumer is debiting service, if the execution of the debit is successful, but fails to write the debit log, the CAP will judge that the consumer failed to execute and try again. If the client does not guarantee idempotency, the framework will retry it, which will inevitably lead to serious consequences for multiple debits.

2. The implementation of the Consumer method succeeded, but received the same message.
2. The execution of the Consumer method succeeded, but received the same message.

The scenario is also possible here. If the Consumer has been successfully executed at the beginning, but for some reason, such as the Broker recovery, and received the same message, the CAP will consider this a new after receiving the Broker message. The message will be executed again by the Consumer. Because it is a new message, the CAP cannot be idempotent at this time.
This scenario is also possible. If the Consumer has been successfully executed at the beginning, but for some reason, such as the Broker recovery, same message has been received, CAP will consider this as a new message after receiving the Broker message. Message will be executed again by the Consumer. Because it is a new message, CAP cannot be idempotent at this time.

3. The current data storage mode can not be idempotent.

@@ -95,7 +95,7 @@ Since we have a temporary storage medium (database table), we may be able to do

Many event-driven frameworks require users to ensure idempotent operations, such as ENode, RocketMQ, etc...

From an implementation point of view, CAP can do some less stringent idempotence, but strict idempotent cannot.
From an implementation point of view, CAP can do some less stringent idempotence, but strict idempotent can not be guaranteed.

### Naturally idempotent message processing



+ 96
- 11
docs/content/user-guide/en/cap/messaging.md View File

@@ -2,27 +2,112 @@

The data sent by using the `ICapPublisher` interface is called `Message`.

## Compensating transaction

Wiki :
[Compensating transaction](https://en.wikipedia.org/wiki/Compensating_transaction)

In some cases, consumers need to return the execution value to tell the publisher, so that the publisher can implement some compensation actions, usually we called message compensation.

Usually you can notify the upstream by republishing a new message in the consumer code. CAP provides a simple way to do this. You can specify `callbackName` parameter when publishing message, usually this only applies to point-to-point consumption. The following is an example.

For example, in an e-commerce application, the initial status of the order is pending, and the status is marked as succeeded when the product quantity is successfully deducted, otherwise it is failed.

```C#
// ============= Publisher =================

_capBus.Publish("place.order.qty.deducted",
contentObj: new { OrderId = 1234, ProductId = 23255, Qty = 1 },
callbackName: "place.order.mark.status");

// publisher using `callbackName` to subscribe consumer result

[CapSubscribe("place.order.mark.status")]
public void MarkOrderStatus(JToken param)
{
var orderId = param.Value<int>("OrderId");
var isSuccess = param.Value<bool>("IsSuccess");
if(isSuccess)
//mark order status to succeeded
else
//mark order status to failed
}

// ============= Consumer ===================

[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JToken param)
{
var orderId = param.Value<int>("OrderId");
var productId = param.Value<int>("ProductId");
var qty = param.Value<int>("Qty");

//business logic

return new { OrderId = orderId, IsSuccess = true };
}
```

## Heterogeneous system integration

In version 3.0+, we reconstructed the message structure. We used the Header in the message protocol in the message queue to transmit some additional information, so that we can do it in the Body without modifying or packaging the user’s original The message data format and content are sent.

This approach is reasonable. It helps to better integrate in heterogeneous systems. Compared with previous versions, users do not need to know the message structure used inside CAP to complete the integration work.

Now we divide the message into Header and Body for transmission.

The data in the body is the content of the original message sent by the user, that is, the content sent by calling the Publish method. We do not perform any packaging, but send it to the message queue after serialization.

In the Header, we need to pass some additional information so that the CAP can extract the key features for operation when the message is received.

The following is the content that needs to be written into the header of the message when sending a message in a heterogeneous system:

Key | DataType | Description
-- | --| --
cap-msg-id | string | Message Id, Generated by snowflake algorithm, can also be guid
cap-msg-name | string | The name of the message
cap-msg-type | string | The type of message, `typeof(T).FullName`(not required)
cap-senttime | string | sending time (not required)

Take the Java system sending RabbitMQ as an example:

```java

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("cap-msg-id", UUID.randomUUID().toString());
headers.put("cap-msg-name", routingKey);

channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes);
// messageBodyBytes = "json".getBytes(Charset.forName("UTF-8"))
// Note that messageBody defaults to byte[] of json. If other serialization is used, the deserializer needs to be customized on the CAP side

```
## Scheduling

After the CAP receives the message, it sends the message to Transport, which is transported by transport.
After CAP receives a message, it sends the message to Transport(RabitMq, Kafka...), which is transported by transport.
When you send using the `ICapPublisher` interface, the CAP will dispatch the message to the corresponding Transport. Currently, bulk messaging is not supported.
When you send message using the `ICapPublisher` interface, CAP will dispatch message to the corresponding Transport. Currently, bulk messaging is not supported.

For more information on transports, see the [Transports](../transports/general.md) section.
For more information on transports, see [Transports](../transport/general.md) section.

## Persistent
## Storage

The CAP will storage after receiving the message. For more information on storage, see the [Persistent](../persistent/general.md) section.
CAP will store the message after receiving it. For more information on storage, see the [Storage](../storage/general.md) section.

## Retry

Retrying plays an important role in the overall CAP architecture design, and CAPs retry for messages that fail to send or fail to execute. There are several retry strategies used throughout the CAP design process.
Retrying plays an important role in the overall CAP architecture design, CAP retry messages that fail to send or fail to execute. There are several retry strategies used throughout the CAP design process.

### Send retry

During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, the CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retries. When the total number of times reaches 50, the CAP will stop retrying.
During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50,CAP will stop retrying.

You can adjust the total number of default retries by setting `FailedRetryCount` in CapOptions.
You can adjust the total number of retries by setting `FailedRetryCount` in CapOptions.

It will stop when the maximum number of times is reached. You can see the reason for the failure in Dashboard and choose whether to manually retry.

@@ -32,10 +117,10 @@ The consumer method is executed when the Consumer receives the message and will

## Data Cleanup

There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, the status will be changed to `Successed`, and `ExpiresAt` will be set to **1 hour** later.
There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later.

Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later.

By default, the data of the message table is deleted **every hour** to avoid performance degradation caused by too much data. The cleanup strategy is `ExpiresAt` is not empty and is less than the current time.
By default, the data of the message in the table is deleted **every hour** to avoid performance degradation caused by too much data. The cleanup strategy `ExpiresAt` is performed when field is not empty and is less than the current time.

That is to say, the message with the status Failed (normally they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up.
That is to say, the message with the status Failed (by default they have been retried 50 times), if you do not have manual intervention for 15 days, it will **also be** cleaned up.

+ 5
- 5
docs/content/user-guide/en/cap/serialization.md View File

@@ -1,6 +1,6 @@
# Serialization

We provide the `ISerializer` interface to support serialization of messages. By default, we use json to serialize messages and store them in the database.
We provide the `ISerializer` interface to support serialization of messages. By default, json is used to serialize messages and store them in the database.

## Custom Serialization

@@ -19,7 +19,7 @@ public class YourSerializer: ISerializer
}
```

Then register your implementation in the container:
Then register your implemented serializer in the container:

```

@@ -32,9 +32,9 @@ services.AddCap

## Message Adapter (removed in v3.0)

In heterogeneous systems, sometimes you need to communicate with other systems, but other systems use message objects that may be different from CAP's [**Wrapper Object**](../persistent/general.md#_7). This time maybe you need to customize the message wapper.
In heterogeneous systems, sometimes you need to communicate with other systems, but other systems use message objects that may be different from CAP's [**Wrapper Object**](../storage/general.md#_7). This time maybe you need to customize the message wapper.

The CAP provides the `IMessagePacker` interface for customizing the [**Wrapper Object**](../persistent/general.md#_7). The custom MessagePacker usually packs and unpacks the `CapMessage` In this process you can add your own business objects.
CAP provides the `IMessagePacker` interface for customizing the [**Wrapper Object**](../storage/general.md#_7). Custom MessagePacker usually packs and unpacks the `CapMessage` In this process you can add your own business objects.

Usage :

@@ -76,7 +76,7 @@ class MyMessagePacker : IMessagePacker
}
```

Next, configure the custom `MyMessagePacker` to the service.
Next, add the custom `MyMessagePacker` to the service.

```csharp



+ 8
- 8
docs/content/user-guide/en/cap/transactions.md View File

@@ -4,7 +4,7 @@

CAP does not directly provide out-of-the-box MS DTC or 2PC-based distributed transactions, instead we provide a solution that can be used to solve problems encountered in distributed transactions.

In a distributed environment, using 2PC or DTC-based distributed transactions can be very expensive due to the overhead involved in communication, as is performance. In addition, since distributed transactions based on 2PC or DTC are also subject to the **CAP theorem**, it will have to give up availability (A in CAP) when network partitioning occurs.
In a distributed environment, using 2PC or DTC-based distributed transactions can be very expensive due to the overhead involved in communication which affects performance. In addition, since distributed transactions based on 2PC or DTC are also subject to the **CAP theorem**, it will have to give up availability (A in CAP) when network partitioning occurs.

> A distributed transaction is a very complex process with a lot of moving parts that can fail. Also, if these parts run on different machines or even in different data centers, the process of committing a transaction could become very long and unreliable.

@@ -27,9 +27,9 @@ For example, suppose we need to solve the following task:
* register a user profile
* do some automated background check that the user can actually access the system

The second task is to ensure, for example, that this user wasn’t banned from our servers for some reason.
Second task is to ensure, for example, that this user wasn’t banned from our servers for some reason.

But it could take time, and we’d like to extract it to a separate microservice. It wouldn’t be reasonable to keep the user waiting for so long just to know that she was registered successfully.
But it could take time, and we’d like to extract it to a separate microservice. It wouldn’t be reasonable to keep the user waiting for so long just to know that he was registered successfully.

**One way to solve it would be with a message-driven approach including compensation**. Let’s consider the following architecture:

@@ -37,13 +37,13 @@ But it could take time, and we’d like to extract it to a separate microservice
* the validation microservice tasked with doing a background check
* the messaging platform that supports persistent queues

The messaging platform could ensure that the messages sent by the microservices are persisted. Then they would be delivered at a later time if the receiver weren’t currently available
The messaging platform could ensure that the messages sent by the microservices are persisted. Then they would be delivered at a later time if the receiver wasn't currently available

#### Happy Scenario
#### Best case scenario

In this architecture, a happy scenario would be:
In this architecture, best case scenario would be:

* the user microservice registers a user, saving information about her in its local database
* the user microservice registers a user, saving information about him in its local database
* the user microservice marks this user with a flag. It could signify that this user hasn’t yet been validated and doesn’t have access to full system functionality
* a confirmation of registration is sent to the user with a warning that not all functionality of the system is accessible right away
* the user microservice sends a message to the validation microservice to do the background check of a user
@@ -51,7 +51,7 @@ In this architecture, a happy scenario would be:
* if the results are positive, the user microservice unblocks the user
* if the results are negative, the user microservice deletes the user account

After we’ve gone through all these steps, the system should be in a consistent state. However, for some period of time, the user entity appeared to be in an incomplete state.
After we’ve gone through all these steps, the system should be in a consistent state. However, for some period of time, user entity appeared to be in an incomplete state.

The last step, when the user microservice removes the invalid account, is a compensation phase.



+ 8
- 4
docs/content/user-guide/en/getting-started/introduction.md View File

@@ -2,16 +2,16 @@

CAP is an EventBus and a solution for solving distributed transaction problems in microservices or SOA systems. It helps create a microservices system that is scalable, reliable, and easy to change.

In Microsoft's [eShopOnContainer](https://github.com/dotnet-architecture/eShopOnContainers) microservices sample project, it is recommended to use CAP as the EventBus available in the production environment.
In Microsoft's [eShopOnContainer](https://github.com/dotnet-architecture/eShopOnContainers) microservices sample project, it is recommended to use CAP as the EventBus in the production environment.


!!! question "What is EventBus?"

An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know.
An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands events that are being sent and received, other components will never know about the substitution.

Compared to other Service Bus or Event Bus, CAP has its own characteristics. It does not require users to implement or inherit any interface when sending messages or processing messages. It has very high flexibility. We have always believed that the appointment is greater than the configuration, so the CAP is very simple to use, very friendly to the novice, and lightweight.
Compared to other Services Bus or Event Bus, CAP has its own characteristics. It does not require users to implement or inherit any interface when sending messages or processing messages. It has very high flexibility. We have always believed that the appointment is greater than the configuration, so the CAP is very simple to use, very friendly to the novice, and lightweight.

The CAP is modular in design and highly scalable. You have many options to choose from, including message queues, storage, serialization, etc. Many elements of the system can be replaced with custom implementations.
CAP is modular in design and highly scalable. You have many options to choose from, including message queues, storage, serialization, etc. Many elements of the system can be replaced with custom implementations.

## Related videos

@@ -25,6 +25,10 @@ The CAP is modular in design and highly scalable. You have many options to choos

[Article: Introduction and how to use](http://www.cnblogs.com/savorboard/p/cap.html)

[Article: New features in version 3.0](https://www.cnblogs.com/savorboard/p/cap-3-0.html)

[Article: New features in version 2.6](https://www.cnblogs.com/savorboard/p/cap-2-6.html)

[Article: New features in version 2.5](https://www.cnblogs.com/savorboard/p/cap-2-5.html)

[Article: New features in version 2.4](http://www.cnblogs.com/savorboard/p/cap-2-4.html)


+ 41
- 1
docs/content/user-guide/en/monitoring/consul.md View File

@@ -1,4 +1,44 @@
# Consul

Consul is a distributed service mesh to connect, secure, and configure services across any runtime platform and public or private cloud.
[Consul](https://www.consul.io/) is a distributed service mesh to connect, secure, and configure services across any runtime platform and public or private cloud.

## Consul Configuration for dashboard

CAP's Dashboard uses Consul as a service discovery to get the data of other nodes, and you can switch to the Servers page to see other nodes.

![](https://camo.githubusercontent.com/54c00c6ae65ce1d7b9109ed8cbcdca703a050c47/687474703a2f2f696d61676573323031372e636e626c6f67732e636f6d2f626c6f672f3235303431372f3230313731302f3235303431372d32303137313030343232313030313838302d313136323931383336322e706e67)

Click the `Switch` button to switch to the target node, CAP will use a proxy to get the data of the node you switched to.

The following is a configuration example, you need to configure them on each node.

```C#
services.AddCap(x =>
{
x.UseMySql(Configuration.GetValue<string>("ConnectionString"));
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(_ =>
{
_.DiscoveryServerHostName = "localhost";
_.DiscoveryServerPort = 8500;
_.CurrentNodeHostName = Configuration.GetValue<string>("ASPNETCORE_HOSTNAME");
_.CurrentNodePort = Configuration.GetValue<int>("ASPNETCORE_PORT");
_.NodeId = Configuration.GetValue<string>("NodeId");
_.NodeName = Configuration.GetValue<string>("NodeName");
});
});
```

Consul 1.6.2:

```
consul agent -dev
```

Windows 10, ASP.NET Core 3.1:

```sh
set ASPNETCORE_HOSTNAME=localhost&& set ASPNETCORE_PORT=5001&& dotnet run --urls=http://localhost:5001 NodeId=1 NodeName=CAP-1 ConnectionString="Server=localhost;Database=aaa;UserId=xxx;Password=xxx;"
set ASPNETCORE_HOSTNAME=localhost&& set ASPNETCORE_PORT=5002&& dotnet run --urls=http://localhost:5002 NodeId=2 NodeName=CAP-2 ConnectionString="Server=localhost;Database=bbb;UserId=xxx;Password=xxx;"
```

+ 4
- 4
docs/content/user-guide/en/monitoring/dashboard.md View File

@@ -1,6 +1,6 @@
# Dashboard

The CAP provides a Dashboard for viewing messages, and the features provided by Dashboard make it easy to view and manage messages.
CAP provides a Dashboard for viewing messages, and features provided by Dashboard make it easy to view and manage messages.

## Enable Dashboard

@@ -24,17 +24,17 @@ By default, you can open the Dashboard by visiting the url `http://localhost:xxx

> Default :'/cap'

You can change the path of the Dashboard by modifying this configuration item.
You can change the path of the Dashboard by modifying this configuration option.

* StatsPollingInterval

> Default: 2000ms

This configuration item is used to configure the Dashboard front end to get the polling time of the status interface (/stats).
This configuration option is used to configure the Dashboard front end to get the polling time of the status interface (/stats).

* Authorization

This configuration item is used to configure the authorization filter when accessing the Dashboard. The default filter allows LAN access. When your application wants to provide external network access, you can customize the authentication rules by setting this configuration. See the next section for details.
This configuration option is used to configure the authorization filter when accessing the Dashboard. The default filter allows LAN access. When your application wants to provide external network access, you can customize authentication rules by setting this configuration. See the next section for details.

### Custom authentication



+ 24
- 2
docs/content/user-guide/en/monitoring/diagnostics.md View File

@@ -1,6 +1,6 @@
# Diagnostics

Diagnostics provides a set of features that make it easy for us to document the critical operations that occur during the application's operation, their execution time, etc., allowing administrators to find the root cause of problems, especially in production environments.
Diagnostics provides a set of features that make it easy for us to document critical operations that occurs during the application's operation, their execution time, etc., allowing administrators to find the root cause of problems, especially in production environments.

## Diagnostics events

@@ -20,4 +20,26 @@ Diagnostics provides external event information as follows:
* After the subscriber method is executed
* Subscriber method execution exception

Related objects, you can find at the `DotNetCore.CAP.Diagnostics` namespace.
Related objects, you can find at the `DotNetCore.CAP.Diagnostics` namespace.


## Tracing CAP events in [Apache Skywalking](https://github.com/apache/skywalking)

Skywalking's C# client provides support for CAP Diagnostics. You can use [SkyAPM-dotnet](https://github.com/SkyAPM/SkyAPM-dotnet) to tracking.

Try to read the [README](https://github.com/SkyAPM/SkyAPM-dotnet/blob/master/README.md) to integrate it in your project.

Example tracking image :

![](https://user-images.githubusercontent.com/8205994/71006463-51025980-2120-11ea-82dc-bffa5530d515.png)


![](https://user-images.githubusercontent.com/8205994/71006589-7b541700-2120-11ea-910b-7e0f2dfddce8.png)

## Others APM support

There is currently no support for APMs other than Skywalking, and if you would like to support CAP diagnostic events in other APM, you can refer to the code here to implement it:

At present, apart from Skywalking, we have not provided support for other APMs. If you need it, you can refer the code [here](https://github.com/SkyAPM/SkyAPM-dotnet/tree/master/src/SkyApm.Diagnostics.CAP) to implementation, and we also welcome the Pull Request.

https://github.com/SkyAPM/SkyAPM-dotnet/tree/master/src/SkyApm.Diagnostics.CAP

+ 0
- 3
docs/content/user-guide/en/monitoring/metrics.md View File

@@ -1,3 +0,0 @@
# Metrics

TODO:

+ 0
- 34
docs/content/user-guide/en/persistent/in-memory-storage.md View File

@@ -1,34 +0,0 @@
# In-Memory Storage

Persistent storage of memory messages is often used in development and test environments, and if you use memory-based storage you lose the reliability of local transaction messages.

## Configuration

To use in-memory storage, you need to install the following extensions from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.InMemoryStorage
```

Next, add configuration items to the `ConfigureServices` method of `Startup.cs`.

```csharp

public void ConfigureServices(IServiceCollection services)
{
// ...

services.AddCap(x =>
{
x.UseInMemoryStorage();
// x.UseXXX ...
});
}

```

The successful message in memory, the CAP will be cleaned **every 5 minutes**.

## Publish with transaction

In-Memory Storage **Not supported** Transaction mode to send messages.

+ 1
- 1
docs/content/user-guide/en/samples/faq.md View File

@@ -6,7 +6,7 @@

!!! faq "Does it require certain different databases, one each for productor and resumer in CAP?"

Not requird differences necessary, a given advice is that using a special database for each program.
Not required differences necessary, a given advice is that using a special database for each program.

Otherwise, look at Q&A below.



+ 1
- 1
docs/content/user-guide/en/samples/github.md View File

@@ -1,5 +1,5 @@
# Github Samples

You can find the sample code at Github repository :
You can find the sample code at the Github repository:

https://github.com/dotnetcore/CAP/tree/master/samples

docs/content/user-guide/en/persistent/general.md → docs/content/user-guide/en/storage/general.md View File

@@ -1,18 +1,18 @@
# General

CAP need to use storage media with persistence capabilities to store event messages, such as through databases or other NoSql facilities. CAP uses this approach to deal with the loss of messages in all environments or network anomalies. The reliability of messages is the cornerstone of distributed transactions, so messages cannot be lost under any circumstances.
CAP needs to use storage media with persistence capabilities to store event messages in databases or other NoSql facilities. CAP uses this approach to deal with loss of messages in all environments or network anomalies. Reliability of messages is the cornerstone of distributed transactions, so messages cannot be lost under any circumstances.

## Persistent
## Persistence

### Before sent

Before the message enters the message queue, the CAP uses the local database table to persist the message, which ensures that the message is not lost when the message queue is abnormal or a network error occurs.
Before message enters the message queue, CAP uses the local database table to persist the message, which ensures that the message is not lost when the message queue is abnormal or a network error occurs.

To ensure the reliability of this mechanism, CAP uses the same database transactions as the business code to ensure that business operations and CAP messages are consistent in the persistence process. That is to say, in the process of message persistence, the database will be rolled back when any one of the exceptions occurs.

### After sent

After the message enters the message queue, the CAP will start the persistence function of the message queue. We need to explain how the CAP message is persisted in RabbitMQ and Kafka.
After the message enters the message queue, CAP will start the persistence function of the message queue. We need to explain how CAP message is persisted in RabbitMQ and Kafka.

For message persistence in RabbitMQ, CAP uses a consumer queue with message persistence, but there may be exceptions here.

@@ -23,7 +23,7 @@ Since Kafka is born with message persistence using files, Kafka will ensure that

## Storage

After the CAP started, two tables are generated into the persistent, by default the name is `Cap.Published` and `Cap.Received`.
After CAP is started, two tables are generated in used storage, by default the name is `Cap.Published` and `Cap.Received`.

### Storage Data Structure

@@ -56,7 +56,7 @@ StatusName | Status Name | string
### Wapper Object

When the CAP sends a message, it will store the original message object in a second package in the `Content` field.
When CAP sends a message, it will store original message object in a second package in the `Content` field.

The following is the **Wapper Object** data structure of Content field.

@@ -67,4 +67,15 @@ Timestamp | Message created time | string
Content | Message content | string
CallbackName | Consumer callback topic name | string

The `Id` field is generate using the mongo [objectid algorithm](https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb).
The `Id` field is generate using the mongo [objectid algorithm](https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb).


## Community-supported extensions

Thanks to the community for supporting CAP, the following is the implementation of community-supported storage

* SQLite ([@colinin](https://github.com/colinin)) :https://github.com/colinin/DotNetCore.CAP.Sqlite

* LiteDB ([@maikebing](https://github.com/maikebing)) :https://github.com/maikebing/CAP.Extensions

* SQLite & Oracle ([@cocosip](https://github.com/cocosip)) :https://github.com/cocosip/CAP-Extensions

+ 34
- 0
docs/content/user-guide/en/storage/in-memory-storage.md View File

@@ -0,0 +1,34 @@
# In-Memory Storage

In-memory storage is often used in development and test environments, and if you use memory-based storage you lose the reliability of local transaction messages.

## Configuration

To use in-memory storage, you need to install following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.InMemoryStorage
```

Next, add configuration items to the `ConfigureServices` method of `Startup.cs`.

```csharp

public void ConfigureServices(IServiceCollection services)
{
// ...

services.AddCap(x =>
{
x.UseInMemoryStorage();
// x.UseXXX ...
});
}

```

CAP will clean **every 5 minutes** Successful messages in memory.

## Publish with transaction

In-Memory Storage **does not support** Transaction mode to send messages.

docs/content/user-guide/en/persistent/mongodb.md → docs/content/user-guide/en/storage/mongodb.md View File

@@ -2,7 +2,7 @@

MongoDB is a cross-platform document-oriented database program. Classified as a NoSQL database program, MongoDB uses JSON-like documents with schema.

CAP has supported MongoDB as persistent since version 2.3 .
CAP supports MongoDB since version 2.3 .

MongoDB supports ACID transactions since version 4.0, so CAP only supports MongoDB above 4.0, and MongoDB needs to be deployed as a cluster, because MongoDB's ACID transaction requires a cluster to be used.

@@ -10,7 +10,7 @@ For a quick development of the MongoDB 4.0+ cluster for the development environm

## Configuration

To use MongoDB storage, you need to install the following extensions from NuGet:
To use MongoDB storage, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.MongoDB

docs/content/user-guide/en/persistent/mysql.md → docs/content/user-guide/en/storage/mysql.md View File

@@ -1,10 +1,10 @@
# MySQL

MySQL is an open-source relational database management system. CAP has supported MySQL as persistent.
MySQL is an open-source relational database management system. CAP supports MySQL database.

## Configuration

To use MySQL storage, you need to install the following extensions from NuGet:
To use MySQL storage, you need to install the following package from NuGet:
```powershell
PM> Install-Package DotNetCore.CAP.MySql

docs/content/user-guide/en/persistent/postgresql.md → docs/content/user-guide/en/storage/postgresql.md View File

@@ -1,10 +1,10 @@
# Postgre SQL

PostgreSQL is an open-source relational database management system. CAP has supported PostgreSQL as persistent.
PostgreSQL is an open-source relational database management system. CAP supports PostgreSQL database.

## Configuration

To use PostgreSQL storage, you need to install the following extensions from NuGet:
To use PostgreSQL storage, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.PostgreSql

docs/content/user-guide/en/persistent/sqlserver.md → docs/content/user-guide/en/storage/sqlserver.md View File

@@ -1,10 +1,13 @@
# SQL Server

SQL Server is a relational database management system developed by Microsoft. CAP has supported SQL Server as persistent.
SQL Server is a relational database management system developed by Microsoft. CAP supports SQL Server database.

!!! warning "Warning"
We currently use `Microsoft.Data.SqlClient` as the database driver, which is the future of SQL Server drivers, and we have abandoned `System.Data.SqlClient`, we suggest that you switch to.

## Configuration

To use SQL Server storage, you need to install the following extensions from NuGet:
To use SQL Server storage, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.SqlServer
@@ -21,7 +24,7 @@ public void ConfigureServices(IServiceCollection services)

services.AddCap(x =>
{
x.UsePostgreSql(opt=>{
x.UseSqlServer(opt=>{
//SqlServerOptions
});
// x.UseXXX ...

+ 90
- 0
docs/content/user-guide/en/transport/aws-sqs.md View File

@@ -0,0 +1,90 @@
# Amazon SQS

AWS SQS is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications.

AWS SNS is a highly available, durable, secure, fully managed pub/sub messaging service that enables you to decouple microservices, distributed systems, and serverless applications.

## How CAP uses AWS SNS and SQS

### SNS

Because CAP works based on the topic pattern, it needs to use AWS SNS, which simplifies the publish and subscribe architecture of messages.

When CAP startups, all subscription names will be registered as SNS topics, and you will see a list of all registered topics in the management console.

SNS does not support use of symbols such as `.` `:` as the name of the topic, so we replaced it. We replaced `.` with `-` and `:` with `_`

!!! note "Precautions"
Amazon SNS currently allows maximum size of published messages to be 256KB

For example, you have the following two subscriber methods in your current project

```C#
[CapSubscribe("sample.sns.foo")]
public void TestFoo(DateTime value)
{
}

[CapSubscribe("sample.sns.bar")]
public void TestBar(DateTime value)
{
}
```
After CAP startups, you will see in SNS management console:

![img](/img/aws-sns-demo.png)

### SQS

For each consumer group, CAP will create a corresponding SQS queue, the name of the queue is the name of the `DefaultGroup` in the configuration options, and the queue type is Standard.

The SQS queue will subscribe to Topic in SNS, as shown below:

![img](/img/aws-sns-demo.png)

!!! warning "Precautions"
Due to the limitation of AWS SNS, when you remove the subscription method, CAP will not delete topics or queues on AWS SNS or SQS, you need to delete them manually.


## Configuration

To use AWS SQS as the transport, you need to install the packages from NuGet:

```shell

Install-Package DotNetCore.CAP.AmazonSQS

```

Next, add configuration items to the `ConfigureServices` method of `Startup.cs`:

```csharp

public void ConfigureServices(IServiceCollection services)
{
// ...

services.AddCap(x =>
{
x.UseAmazonSQS(opt=>
{
//AmazonSQSOptions
});
// x.UseXXX ...
});
}

```

#### AmazonSQS Options

CAP 直接对外提供的 AmazonSQSOptions 配置参数如下:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Region | AWS 所处的区域 | Amazon.RegionEndpoint |
Credentials | AWS AK SK信息 | Amazon.Runtime.AWSCredentials |

如果你的项目运行在 AWS EC2 中,则不需要设置 Credentials,直接对 EC2 应用 IAM 策略即可。

Credentials 需要具有新增和订阅 SNS Topic,SQS Queue 等权限。

docs/content/user-guide/en/transports/azure-service-bus.md → docs/content/user-guide/en/transport/azure-service-bus.md View File

@@ -2,21 +2,19 @@

Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus is most commonly used to decouple applications and services from each other, and is a reliable and secure platform for asynchronous data and state transfer.

CAP supports Azure Service Bus as a message transporter.
Azure services can be used in CAP as a message transporter.

## Configuration

!!! warning "Requirement"
For the Service Bus pricing layer, CAP requires "standard" or "advanced" to support Topic functionality.

To use Azure Service Bus as a message transport, you need to install the following extensions from NuGet:
To use Azure Service Bus as a message transport, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.AzureServiceBus
```

Then you can add memory-based configuration items to the `ConfigureServices` method of `Startup.cs`.

Next, add configuration items to the `ConfigureServices` method of `Startup.cs`:

```csharp

@@ -38,7 +36,7 @@ public void ConfigureServices(IServiceCollection services)

#### AzureServiceBus Options

The AzureServiceBus configuration options provided directly by the CAP are as follows:
The AzureServiceBus configuration options provided directly by the CAP:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---

docs/content/user-guide/en/transports/general.md → docs/content/user-guide/en/transport/general.md View File

@@ -9,6 +9,7 @@ CAP supports several transport methods:
* [RabbitMQ](rabbitmq.md)
* [Kafka](kafka.md)
* [Azure Service Bus](azure-service-bus.md)
* [Amazon SQS](aws-sqs.md)
* [In-Memory Queue](in-memory-queue.md)

## How to select a transport
@@ -27,3 +28,15 @@ CAP supports several transport methods:
>`Kafka` vs `RabbitMQ` :
> https://stackoverflow.com/questions/42151544/is-there-any-reason-to-use-rabbitmq-over-kafka

## Community-supported extensions

Thanks to the community for supporting CAP, the following is the implementation of community-supported transport

* ActiveMQ (@[Lukas Zhang](https://github.com/lukazh/Lukaz.CAP.ActiveMQ)): https://github.com/lukazh

* RedisMQ ([@木木](https://github.com/difudotnet)) https://github.com/difudotnet/CAP.RedisMQ.Extensions

* ZeroMQ ([@maikebing](https://github.com/maikebing)): https://github.com/maikebing/CAP.Extensions




docs/content/user-guide/en/transports/in-memory-queue.md → docs/content/user-guide/en/transport/in-memory-queue.md View File

@@ -4,13 +4,14 @@ In Memory Queue is a memory-based message queue provided by [Community](https://

## Configuration

To use In Memory Queue as a message transporter, you need to install the following extensions from NuGet:
To use In Memory Queue as a message transporter, you need to install the following package from NuGet:

```powershell
PM> Install-Package Savorboard.CAP.InMemoryMessageQueue

```
Then you can add memory-based configuration items to the `ConfigureServices` method of `Startup.cs`.

Next, add configuration options to the `ConfigureServices` method of `Startup.cs`:

```csharp


docs/content/user-guide/en/transports/kafka.md → docs/content/user-guide/en/transport/kafka.md View File

@@ -2,18 +2,18 @@

[Apache Kafka®](https://kafka.apache.org/) is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java.

CAP has supported Kafka® as message transporter.
Kafka® can be used in CAP as a message transporter.

## Configuration

To use Kafka transporter, you need to install the following extensions from NuGet:
To use Kafka transporter, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.Kafka

```

Then you can add memory-based configuration items to the `ConfigureServices` method of `Startup.cs`.
Then you can add configuration items to the `ConfigureServices` method of `Startup.cs`.

```csharp

@@ -34,7 +34,7 @@ public void ConfigureServices(IServiceCollection services)

#### Kafka Options

The Kafka configuration parameters provided directly by the CAP are as follows:
The Kafka configuration parameters provided directly by the CAP:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
@@ -43,7 +43,7 @@ ConnectionPoolSize | connection pool size | int | 10

#### Kafka MainConfig Options

If you need **more** native Kakfa related configuration items, you can set it with the `MainConfig` configuration option:
If you need **more** native Kakfa related configuration options, you can set them in the `MainConfig` configuration option:

```csharp
services.AddCap(capOptions =>
@@ -56,6 +56,6 @@ services.AddCap(capOptions =>
});
```

`MainConfig` is a configuration dictionary, you can find a list of supported configuration items through the following link.
`MainConfig` is a configuration dictionary, you can find a list of supported configuration options through the following link.

[https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)

docs/content/user-guide/en/transports/rabbitmq.md → docs/content/user-guide/en/transport/rabbitmq.md View File

@@ -2,11 +2,11 @@

RabbitMQ is an open-source message-broker software that originally implemented the Advanced Message Queuing Protocol and has since been extended with a plug-in architecture to support Streaming Text Oriented Messaging Protocol, Message Queuing Telemetry Transport, and other protocols.

CAP has supported RabbitMQ as message transporter.
RabbitMQ can be used in CAP as a message transporter.

## Configuration

To use RabbitMQ transporter, you need to install the following extensions from NuGet:
To use RabbitMQ transporter, you need to install the following package from NuGet:

```powershell
PM> Install-Package DotNetCore.CAP.RabbitMQ
@@ -35,7 +35,7 @@ public void ConfigureServices(IServiceCollection services)

#### RabbitMQ Options

The RabbitMQ configuration parameters provided directly by the CAP are as follows:
The RabbitMQ configuration parameters provided directly by CAP:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
@@ -64,4 +64,12 @@ services.AddCap(x =>
});
});

```

#### How to connect cluster

using comma split connection string, like this:

```
x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")
```

+ 46
- 3
docs/content/user-guide/zh/cap/messaging.md View File

@@ -6,6 +6,50 @@

你可以阅读 [quick-start](../getting-started/quick-start.md#_3) 来学习如何发送和处理消息。

## 补偿事务

[Compensating transaction](https://en.wikipedia.org/wiki/Compensating_transaction)

某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。

你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 `callbackName` 来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。

例如,在一个电商程序中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。

```C#
// ============= Publisher =================

_capBus.Publish("place.order.qty.deducted", new { OrderId = 1234, ProductId = 23255, Qty = 1 }, "place.order.mark.status");

// publisher using `callbackName` to subscribe consumer result

[CapSubscribe("place.order.mark.status")]
public void MarkOrderStatus(JToken param)
{
var orderId = param.Value<int>("OrderId");
var isSuccess = param.Value<bool>("IsSuccess");
if(isSuccess)
//mark order status to succeeded
else
//mark order status to failed
}

// ============= Consumer ===================

[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JToken param)
{
var orderId = param.Value<int>("OrderId");
var productId = param.Value<int>("ProductId");
var qty = param.Value<int>("Qty");

//business logic

return new { OrderId = orderId, IsSuccess = true };
}
```

## 异构系统集成

在 3.0+ 版本中,我们对消息结构进行了重构,我们利用了消息队列中消息协议中的 Header 来传输一些额外信息,以便于在 Body 中我们可以做到不需要修改或包装使用者的原始消息数据格式和内容进行发送。
@@ -45,18 +89,17 @@ channel.basicPublish(exchangeName, routingKey,

```


## 消息调度

CAP 接收到消息之后会将消息发送到 Transport, 由 Transport 进行运输。

当你使用 `ICapPublisher` 接口发送时,CAP将会将消息调度到相应的 Transport中去,目前还不支持批量发送消息。

有关 Transports 的更多信息,可以查看 [Transports](../transports/general.md) 章节。
有关 Transports 的更多信息,可以查看 [Transports](../transport/general.md) 章节。

## 消息存储

CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 [Persistent](../persistent/general.md) 章节。
CAP 接收到消息之后会将消息进行 Persistent(持久化), 有关 Persistent 的更多信息,可以查看 [Persistent](../storage/general.md) 章节。

## 消息重试



+ 2
- 2
docs/content/user-guide/zh/cap/serialization.md View File

@@ -34,9 +34,9 @@ services.AddCap

## 消息适配器 (v3.0移除 )

在异构系统中,有时候需要和其他系统进行通讯,但是其他系统使用的消息对象可能和 CAP 的[**包装器对象**](../persistent/general.md#_7)不一样,这个时候就需要对消息进行自定义适配。
在异构系统中,有时候需要和其他系统进行通讯,但是其他系统使用的消息对象可能和 CAP 的[**包装器对象**](../storage/general.md#_7)不一样,这个时候就需要对消息进行自定义适配。

CAP 提供了 `IMessagePacker` 接口用于对 [**包装器对象**](../persistent/general.md#_7) 进行自定义,自定义的 MessagePacker 通常是将 `CapMessage` 进行打包和解包操作,在这个过程中可以添加自己的业务对象。
CAP 提供了 `IMessagePacker` 接口用于对 [**包装器对象**](../storage/general.md#_7) 进行自定义,自定义的 MessagePacker 通常是将 `CapMessage` 进行打包和解包操作,在这个过程中可以添加自己的业务对象。

使用方法:



+ 4
- 0
docs/content/user-guide/zh/getting-started/introduction.md View File

@@ -25,6 +25,10 @@ CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可

[Article: CAP 介绍及使用](http://www.cnblogs.com/savorboard/p/cap.html)

[Article: CAP 3.0 版本中的新特性](https://www.cnblogs.com/savorboard/p/cap-3-0.html)

[Article: CAP 2.6 版本中的新特性](https://www.cnblogs.com/savorboard/p/cap-2-6.html)

[Article: CAP 2.5 版本中的新特性](https://www.cnblogs.com/savorboard/p/cap-2-5.html)

[Article: CAP 2.4 版本中的新特性](http://www.cnblogs.com/savorboard/p/cap-2-4.html)


+ 41
- 1
docs/content/user-guide/zh/monitoring/consul.md View File

@@ -1,4 +1,44 @@
# Consul

Consul is a distributed service mesh to connect, secure, and configure services across any runtime platform and public or private cloud.
[Consul](https://www.consul.io/) 是一个分布式服务网格,用于跨任何运行时平台和公共或私有云连接,保护和配置服务。

## Dashboard 中的 Consul 配置

CAP的 Dashboard 使用 Consul 作为服务发现来显示其他节点的数据,然后你就在任意节点的 Dashboard 中切换到 Servers 页面看到其他的节点。

![](https://camo.githubusercontent.com/54c00c6ae65ce1d7b9109ed8cbcdca703a050c47/687474703a2f2f696d61676573323031372e636e626c6f67732e636f6d2f626c6f672f3235303431372f3230313731302f3235303431372d32303137313030343232313030313838302d313136323931383336322e706e67)

通过点击 Switch 按钮来切换到其他的节点看到其他节点的数据,而不必访问很多地址来分别查看。
以下是一个配置示例, 你需要在每个节点分别配置:

```C#
services.AddCap(x =>
{
x.UseMySql(Configuration.GetValue<string>("ConnectionString"));
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseDiscovery(_ =>
{
_.DiscoveryServerHostName = "localhost";
_.DiscoveryServerPort = 8500;
_.CurrentNodeHostName = Configuration.GetValue<string>("ASPNETCORE_HOSTNAME");
_.CurrentNodePort = Configuration.GetValue<int>("ASPNETCORE_PORT");
_.NodeId = Configuration.GetValue<string>("NodeId");
_.NodeName = Configuration.GetValue<string>("NodeName");
});
});
```

Consul 1.6.2:

```
consul agent -dev
```

Windows 10, ASP.NET Core 3.1:

```sh
set ASPNETCORE_HOSTNAME=localhost&& set ASPNETCORE_PORT=5001&& dotnet run --urls=http://localhost:5001 NodeId=1 NodeName=CAP-1 ConnectionString="Server=localhost;Database=aaa;UserId=xxx;Password=xxx;"
set ASPNETCORE_HOSTNAME=localhost&& set ASPNETCORE_PORT=5002&& dotnet run --urls=http://localhost:5002 NodeId=2 NodeName=CAP-2 ConnectionString="Server=localhost;Database=bbb;UserId=xxx;Password=xxx;"
```

+ 3
- 3
docs/content/user-guide/zh/monitoring/diagnostics.md View File

@@ -1,9 +1,9 @@
# Diagnostics
# 性能追踪

Diagnostics 提供一组功能使我们能够很方便的可以记录在应用程序运行期间发生的关键性操作以及他们的执行时间等,使管理员可以查找特别是生产环境中出现问题所在的根本原因。


## CAP 中的 Diagnostics
## CAP 中的性能追踪

在 CAP 中,对 `DiagnosticSource` 提供了支持,监听器名称为 `CapDiagnosticListener`。

@@ -37,6 +37,6 @@ Skywalking 的 C# 客户端提供了对 CAP Diagnostics 的支持,你可以利

## 其他 APM 的支持

目前还没有实现对除了Skywalking的其他APM的支持,如果你想在其他 APM 中实现对 CAP 诊断事件的支持,你可以参考这里的代码来实现它:
目前还没有实现对除了 Skywalking 的其他APM的支持,如果你想在其他 APM 中实现对 CAP 诊断事件的支持,你可以参考这里的代码来实现它:

https://github.com/SkyAPM/SkyAPM-dotnet/tree/master/src/SkyApm.Diagnostics.CAP

+ 0
- 3
docs/content/user-guide/zh/monitoring/metrics.md View File

@@ -1,3 +0,0 @@
# Metrics

TODO:

docs/content/user-guide/zh/persistent/general.md → docs/content/user-guide/zh/storage/general.md View File

@@ -68,4 +68,10 @@ CallbackName | 回调的订阅者名称 | string

感谢社区对CAP的支持,以下是社区支持的持久化的实现

* SQLite ([@colinin](https://github.com/colinin)) : https://github.com/colinin/DotNetCore.CAP.Sqlite
* SQLite ([@colinin](https://github.com/colinin)) :https://github.com/colinin/DotNetCore.CAP.Sqlite

* LiteDB ([@maikebing](https://github.com/maikebing)) :https://github.com/maikebing/CAP.Extensions

* SQLite & Oracle ([@cocosip](https://github.com/cocosip)) :https://github.com/cocosip/CAP-Extensions

* SmartSql ([@xiangxiren](https://github.com/xiangxiren)) :https://github.com/xiangxiren/SmartSql.CAP

docs/content/user-guide/zh/persistent/in-memory-storage.md → docs/content/user-guide/zh/storage/in-memory-storage.md View File


docs/content/user-guide/zh/persistent/mongodb.md → docs/content/user-guide/zh/storage/mongodb.md View File


docs/content/user-guide/zh/persistent/mysql.md → docs/content/user-guide/zh/storage/mysql.md View File


docs/content/user-guide/zh/persistent/postgresql.md → docs/content/user-guide/zh/storage/postgresql.md View File


docs/content/user-guide/zh/persistent/sqlserver.md → docs/content/user-guide/zh/storage/sqlserver.md View File

@@ -2,6 +2,9 @@

SQL Server 是由微软开发的一个关系型数据库,你可以使用 SQL Server 来作为 CAP 消息的持久化。

!!! warning "注意"
我们目前使用 `Microsoft.Data.SqlClient` 作为数据库驱动程序,它是SQL Server 驱动的未来,并且已经放弃了 `System.Data.SqlClient`,我们建议你切换过去。

## 配置

要使用 SQL Server 存储,你需要从 NuGet 安装以下扩展包:
@@ -22,7 +25,7 @@ public void ConfigureServices(IServiceCollection services)

services.AddCap(x =>
{
x.UsePostgreSql(opt=>{
x.UseSqlServer(opt=>{
//SqlServerOptions
});
// x.UseXXX ...

+ 91
- 0
docs/content/user-guide/zh/transport/aws-sqs.md View File

@@ -0,0 +1,91 @@
# Amazon SQS

AWS SQS 是一种完全托管的消息队列服务,可让您分离和扩展微服务、分布式系统和无服务器应用程序。

AWS SNS 是一种高度可用、持久、安全、完全托管的发布/订阅消息收发服务,可以轻松分离微服务、分布式系统和无服务器应用程序。

## CAP 如何使用 AWS SNS & SQS

### SNS

由于 CAP 是基于 Topic 模式工作的,所以需要使用到 AWS SNS,SNS 简化了消息的发布订阅架构。

在 CAP 启动时会将所有的订阅名称注册为 SNS 的 Topic,你将会在管理控制台中看到所有已经注册的 Topic 列表。

由于 SNS 不支持使用 `.` `:` 等符号作为 Topic 的名称,所以我们进行了替换,我们将 `.` 替换为了 `-`,将 `:` 替换为了 `_`

!!! note "注意事项"
Amazon SNS 当前允许发布的消息最大大小为 256KB

举例,你的当前项目中有以下两个订阅者方法

```C#
[CapSubscribe("sample.sns.foo")]
public void TestFoo(DateTime value)
{
}

[CapSubscribe("sample.sns.bar")]
public void TestBar(DateTime value)
{
}
```

在 CAP 启动后,在 AWS SNS 中你将看到

![img](/img/aws-sns-demo.png)

### SQS

针对每个消费者组,CAP 将创建一个与之对应的 SQS 队列,队列的名称为配置项中 DefaultGroup 的名称,类型为 Standard Queue 。

该 SQS 队列将订阅 SNS 中的 Topic ,如下图:

![img](/img/aws-sns-demo.png)

!!! warning "注意事项"
由于 AWS SNS 的限制,当你减少订阅方法时,我们不会主动删除 AWS SNS 或者 SQS 上的相关 Topic 或 Queue,你需要手动删除他们。


## 配置

要使用 AWS SQS 作为消息传输器,你需要从 NuGet 安装以下扩展包:

```shell

Install-Package DotNetCore.CAP.AmazonSQS

```

然后,你可以在 `Startup.cs` 的 `ConfigureServices` 方法中添加基于 RabbitMQ 的配置项。

```csharp

public void ConfigureServices(IServiceCollection services)
{
// ...

services.AddCap(x =>
{
x.UseAmazonSQS(opt=>
{
//AmazonSQSOptions
});
// x.UseXXX ...
});
}

```

#### AmazonSQS Options

CAP 直接对外提供的 AmazonSQSOptions 配置参数如下:

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Region | AWS 所处的区域 | Amazon.RegionEndpoint |
Credentials | AWS AK SK信息 | Amazon.Runtime.AWSCredentials |

如果你的项目运行在 AWS EC2 中,则不需要设置 Credentials,直接对 EC2 应用 IAM 策略即可。

Credentials 需要具有新增和订阅 SNS Topic,SQS Queue 等权限。

docs/content/user-guide/zh/transports/azure-service-bus.md → docs/content/user-guide/zh/transport/azure-service-bus.md View File


docs/content/user-guide/zh/transports/general.md → docs/content/user-guide/zh/transport/general.md View File

@@ -9,12 +9,13 @@ CAP 支持以下几种运输方式:
* [RabbitMQ](rabbitmq.md)
* [Kafka](kafka.md)
* [Azure Service Bus](azure-service-bus.md)
* [Amazon SQS](aws-sqs.md)
* [In-Memory Queue](in-memory-queue.md)

## 怎么选择运输器

🏳‍🌈 | RabbitMQ | Kafka | Azure Service Bus | In-Memory
:-- | :--: | :--: | :--: | :-- :
:-- | :--: | :--: | :--: | :--:
**定位** | 可靠消息传输 | 实时数据处理 | 云 | 内存型,测试
**分布式** | ✔ | ✔ | ✔ |❌
**持久化** | ✔ | ✔ | ✔ | ❌
@@ -25,4 +26,14 @@ CAP 支持以下几种运输方式:
> http://geekswithblogs.net/michaelstephenson/archive/2012/08/12/150399.aspx

>`Kafka` vs `RabbitMQ` :
> https://stackoverflow.com/questions/42151544/is-there-any-reason-to-use-rabbitmq-over-kafka
> https://stackoverflow.com/questions/42151544/is-there-any-reason-to-use-rabbitmq-over-kafka

## 社区支持的运输器

感谢社区对CAP的支持,以下是社区支持的运输器实现

* ActiveMQ (@[Lukas Zhang](https://github.com/lukazh/Lukaz.CAP.ActiveMQ)): https://github.com/lukazh

* RedisMQ ([@木木](https://github.com/difudotnet)): https://github.com/difudotnet/CAP.RedisMQ.Extensions

* ZeroMQ ([@maikebing](https://github.com/maikebing)): https://github.com/maikebing/CAP.Extensions/tree/master/src/DotNetCore.CAP.ZeroMQ

docs/content/user-guide/zh/transports/in-memory-queue.md → docs/content/user-guide/zh/transport/in-memory-queue.md View File


docs/content/user-guide/zh/transports/kafka.md → docs/content/user-guide/zh/transport/kafka.md View File


docs/content/user-guide/zh/transports/rabbitmq.md → docs/content/user-guide/zh/transport/rabbitmq.md View File

@@ -66,3 +66,11 @@ services.AddCap(x =>
});

```

#### 如何连接 RabbitMQ 集群?

使用逗号分隔连接字符串即可,如下:

```
x=> x.UseRabbitMQ("localhost:5672,localhost:5673,localhost:5674")
```

+ 36
- 36
docs/mkdocs.yml View File

@@ -10,7 +10,7 @@ edit_uri: 'edit/master/docs/content'
docs_dir: 'content'

# Copyright
copyright: Copyright &copy; 2017 <a href="https://github.com/dotnetcore">NCC</a>, Maintained by the <a href="/about/contact-us/#cap-team">CAP Team</a>.
copyright: Copyright &copy; 2020 <a href="https://github.com/dotnetcore">NCC</a>, Maintained by the <a href="/about/contact-us/#cap-team">CAP Team</a>.


#theme: material
@@ -23,8 +23,8 @@ theme:
include_sidebar: true
logo: 'img/logo.svg'
favicon: 'img/favicon.ico'
feature:
tabs: true
features:
- tabs
i18n:
prev: 'Previous'
next: 'Next'
@@ -32,11 +32,11 @@ theme:
#Customization
extra:
social:
- type: 'github'
- icon: 'fontawesome/brands/github'
link: 'https://github.com/dotnetcore/CAP'
- type: 'twitter'
- icon: 'fontawesome/brands/twitter'
link: 'https://twitter.com/ncc_community'
- type: 'weibo'
- icon: 'fontawesome/brands/weibo'
link: 'https://weibo.com/dotnetcore'

# Extensions
@@ -81,24 +81,25 @@ nav:
- CAP:
- Configuration: user-guide/en/cap/configuration.md
- Messaging: user-guide/en/cap/messaging.md
- Sagas: user-guide/en/cap/sagas.md
- Serialization: user-guide/en/cap/serialization.md
- Transactions: user-guide/en/cap/transactions.md
- Idempotence: user-guide/en/cap/idempotence.md
- Transports:
- General: user-guide/en/transports/general.md
- RabbitMQ: user-guide/en/transports/rabbitmq.md
- Apache Kafka®: user-guide/en/transports/kafka.md
- Azure Service Bus: user-guide/en/transports/azure-service-bus.md
- In-Memory Queue: user-guide/en/transports/in-memory-queue.md
- Persistent:
- General: user-guide/en/persistent/general.md
- SQL Server: user-guide/en/persistent/sqlserver.md
- MySQL: user-guide/en/persistent/mysql.md
- PostgreSql: user-guide/en/persistent/postgresql.md
- MongoDB: user-guide/en/persistent/mongodb.md
- In-Memory: user-guide/en/persistent/in-memory-storage.md
- Transport:
- General: user-guide/en/transport/general.md
- RabbitMQ: user-guide/en/transport/rabbitmq.md
- Apache Kafka®: user-guide/en/transport/kafka.md
- Azure Service Bus: user-guide/en/transport/azure-service-bus.md
- Amazon SQS: user-guide/en/transport/aws-sqs.md
- In-Memory Queue: user-guide/en/transport/in-memory-queue.md
- Storage:
- General: user-guide/en/storage/general.md
- SQL Server: user-guide/en/storage/sqlserver.md
- MySQL: user-guide/en/storage/mysql.md
- PostgreSql: user-guide/en/storage/postgresql.md
- MongoDB: user-guide/en/storage/mongodb.md
- In-Memory: user-guide/en/storage/in-memory-storage.md
- Monitoring:
- Consul: user-guide/en/monitoring/consul.md
- Dashboard: user-guide/en/monitoring/dashboard.md
- Diagnostics: user-guide/en/monitoring/diagnostics.md
- Samples:
@@ -113,29 +114,28 @@ nav:
- CAP:
- 配置: user-guide/zh/cap/configuration.md
- 消息: user-guide/zh/cap/messaging.md
- Sagas: user-guide/zh/cap/sagas.md
- 序列化: user-guide/zh/cap/serialization.md
- 运输: user-guide/zh/cap/transactions.md
- 事务: user-guide/zh/cap/transactions.md
- 幂等性: user-guide/zh/cap/idempotence.md
- 传输:
- 简介: user-guide/zh/transports/general.md
- RabbitMQ: user-guide/zh/transports/rabbitmq.md
- Apache Kafka®: user-guide/zh/transports/kafka.md
- Azure Service Bus: user-guide/zh/transports/azure-service-bus.md
- In-Memory Queue: user-guide/zh/transports/in-memory-queue.md
- 持久化:
- 简介: user-guide/zh/persistent/general.md
- SQL Server: user-guide/zh/persistent/sqlserver.md
- MySQL: user-guide/zh/persistent/mysql.md
- PostgreSql: user-guide/zh/persistent/postgresql.md
- MongoDB: user-guide/zh/persistent/mongodb.md
- In-Memory: user-guide/zh/persistent/in-memory-storage.md
- 简介: user-guide/zh/transport/general.md
- RabbitMQ: user-guide/zh/transport/rabbitmq.md
- Apache Kafka®: user-guide/zh/transport/kafka.md
- Azure Service Bus: user-guide/zh/transport/azure-service-bus.md
- Amazon SQS: user-guide/zh/transport/aws-sqs.md
- In-Memory Queue: user-guide/zh/transport/in-memory-queue.md
- 存储:
- 简介: user-guide/zh/storage/general.md
- SQL Server: user-guide/zh/storage/sqlserver.md
- MySQL: user-guide/zh/storage/mysql.md
- PostgreSql: user-guide/zh/storage/postgresql.md
- MongoDB: user-guide/zh/storage/mongodb.md
- In-Memory: user-guide/zh/storage/in-memory-storage.md
- 监控:
- Consul: user-guide/zh/monitoring/consul.md
- Dashboard: user-guide/zh/monitoring/dashboard.md
- Diagnostics: user-guide/zh/monitoring/diagnostics.md
- 性能追踪: user-guide/zh/monitoring/diagnostics.md
- 健康检查: user-guide/zh/monitoring/health-checks.md
- Metrics: user-guide/zh/monitoring/metrics.md
- 示例:
- Github: user-guide/zh/samples/github.md
- eShopOnContainers: user-guide/zh/samples/eshoponcontainers.md


samples/Sample.Kafka.InMemory/Controllers/ValuesController.cs → samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs View File

@@ -3,7 +3,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Kafka.InMemory.Controllers
namespace Sample.AmazonSQS.InMemory.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
@@ -18,13 +18,13 @@ namespace Sample.Kafka.InMemory.Controllers
[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
await _capBus.PublishAsync("sample.azure.mysql2", DateTime.Now);
await _capBus.PublishAsync("sample.aws.in-memory", DateTime.Now);

return Ok();
}

[CapSubscribe("sample.azure.mysql2")]
public void Test2T2(DateTime value)
[CapSubscribe("sample.aws.in-memory")]
public void SubscribeInMemoryTopic(DateTime value)
{
Console.WriteLine("Subscriber output message: " + value);
}

samples/Sample.Kafka.InMemory/Program.cs → samples/Sample.AmazonSQS.InMemory/Program.cs View File

@@ -1,7 +1,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Sample.Kafka.InMemory
namespace Sample.AmazonSQS.InMemory
{
public class Program
{

samples/Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj → samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj View File

@@ -1,13 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>


samples/Sample.Kafka.InMemory/Startup.cs → samples/Sample.AmazonSQS.InMemory/Startup.cs View File

@@ -1,7 +1,8 @@
using Microsoft.AspNetCore.Builder;
using Amazon;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

namespace Sample.Kafka.InMemory
namespace Sample.AmazonSQS.InMemory
{
public class Startup
{
@@ -10,7 +11,7 @@ namespace Sample.Kafka.InMemory
services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseKafka("localhost:9092");
x.UseAmazonSQS(RegionEndpoint.CNNorthWest1);
x.UseDashboard();
});


samples/Sample.Kafka.InMemory/appsettings.json → samples/Sample.AmazonSQS.InMemory/appsettings.json View File

@@ -2,7 +2,7 @@
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
"Default": "Information"
}
}
}

+ 1
- 1
samples/Sample.ConsoleApp/Program.cs View File

@@ -16,7 +16,7 @@ namespace Sample.ConsoleApp
{
//console app does not support dashboard

x.UseMySql("Server=192.168.3.57;Port=3307;Database=captest;Uid=root;Pwd=123123;");
x.UseMySql("<ConnectionString>");
x.UseRabbitMQ(z =>
{
z.HostName = "192.168.3.57";


+ 2
- 2
samples/Sample.ConsoleApp/Sample.ConsoleApp.csproj View File

@@ -1,12 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
</ItemGroup>
<ItemGroup>


+ 2
- 2
samples/Sample.Kafka.PostgreSql/Sample.Kafka.PostgreSql.csproj View File

@@ -1,13 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<WarningsAsErrors>NU1701</WarningsAsErrors>
<NoWarn>NU1701</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Dapper" Version="2.0.78" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />


+ 1
- 1
samples/Sample.RabbitMQ.MongoDB/Sample.RabbitMQ.MongoDB.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>


+ 1
- 1
samples/Sample.RabbitMQ.MongoDB/Startup.cs View File

@@ -20,7 +20,7 @@ namespace Sample.RabbitMQ.MongoDB
services.AddCap(x =>
{
x.UseMongoDB(Configuration.GetConnectionString("MongoDB"));
x.UseRabbitMQ("192.168.2.120");
x.UseRabbitMQ("");
x.UseDashboard();
});
services.AddControllers();


+ 2
- 2
samples/Sample.RabbitMQ.MySql/AppDbContext.cs View File

@@ -26,13 +26,13 @@ namespace Sample.RabbitMQ.MySql
}
public class AppDbContext : DbContext
{
public const string ConnectionString = "Server=localhost;Database=testcap;UserId=root;Password=123123;";
public const string ConnectionString = "";

public DbSet<Person> Persons { get; set; }

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql(ConnectionString);
optionsBuilder.UseMySql(ConnectionString, ServerVersion.FromString("mysql"));
}
}
}

+ 1
- 1
samples/Sample.RabbitMQ.MySql/Controllers/ValuesController.cs View File

@@ -4,7 +4,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
using MySqlConnector;

namespace Sample.RabbitMQ.MySql.Controllers
{


+ 3
- 3
samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj View File

@@ -1,12 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="3.1.1" />
<PackageReference Include="Dapper" Version="2.0.78" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="5.0.0-alpha.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />


+ 2
- 14
samples/Sample.RabbitMQ.SqlServer/AppDbContext.cs View File

@@ -12,23 +12,11 @@ namespace Sample.RabbitMQ.SqlServer
{
return $"Name:{Name}, Id:{Id}";
}
}

public class Person2
{
public int Id { get; set; }

public string Name { get; set; }

public override string ToString()
{
return $"Name:{Name}, Id:{Id}";
}
}
}

public class AppDbContext : DbContext
{
public const string ConnectionString = "Server=192.168.2.120;Database=captest;User Id=sa;Password=P@ssw0rd;";
public const string ConnectionString = "";

public DbSet<Person> Persons { get; set; }



+ 18
- 8
samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs View File

@@ -1,8 +1,8 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP;
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Data.SqlClient;

@@ -21,7 +21,7 @@ namespace Sample.RabbitMQ.SqlServer.Controllers
[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
await _capBus.PublishAsync("sample.rabbitmq.mysql", new Person()
await _capBus.PublishAsync("sample.rabbitmq.sqlserver", new Person()
{
Id = 123,
Name = "Bar"
@@ -40,7 +40,11 @@ namespace Sample.RabbitMQ.SqlServer.Controllers
//your business code
connection.Execute("insert into test(name) values('test')", transaction: transaction);

_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
_capBus.Publish("sample.rabbitmq.sqlserver", new Person()
{
Id = 123,
Name = "Bar"
});
}
}

@@ -54,22 +58,28 @@ namespace Sample.RabbitMQ.SqlServer.Controllers
{
dbContext.Persons.Add(new Person() { Name = "ef.transaction" });

_capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
_capBus.Publish("sample.rabbitmq.sqlserver", new Person()
{
Id = 123,
Name = "Bar"
});
}
return Ok();
}

[NonAction]
[CapSubscribe("sample.rabbitmq.mysql")]
public void Subscriber(DateTime p)
[CapSubscribe("sample.rabbitmq.sqlserver")]
public void Subscriber(Person p)
{
Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
}

[NonAction]
[CapSubscribe("sample.rabbitmq.mysql", Group = "group.test2")]
public void Subscriber2(DateTime p, [FromCap]CapHeader header)
[CapSubscribe("sample.rabbitmq.sqlserver", Group = "group.test2")]
public void Subscriber2(Person p, [FromCap]CapHeader header)
{
var id = header[Headers.MessageId];

Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
}
}


+ 4
- 4
samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj View File

@@ -1,16 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.35" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="3.1.5">
<PackageReference Include="Dapper" Version="2.0.78" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.5" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.1" />
</ItemGroup>
<ItemGroup>


+ 1
- 1
samples/Sample.RabbitMQ.SqlServer/Startup.cs View File

@@ -15,7 +15,7 @@ namespace Sample.RabbitMQ.SqlServer
services.AddCap(x =>
{
x.UseEntityFramework<AppDbContext>();
x.UseRabbitMQ("192.168.2.120");
x.UseRabbitMQ("");
x.UseDashboard();
x.FailedRetryCount = 5;
x.FailedThresholdCallback = failed =>


+ 3
- 2
src/Directory.Build.props View File

@@ -18,14 +18,15 @@

<!-- Using SourceLink -->
<PropertyGroup>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
<PackageReference Include="JetBrains.Annotations" Version="2020.1.0" PrivateAssets="All" />
<PackageReference Include="JetBrains.Annotations" Version="2020.3.0" PrivateAssets="All" />
</ItemGroup>

</Project>

+ 212
- 0
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs View File

@@ -0,0 +1,212 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
using Headers = DotNetCore.CAP.Messages.Headers;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly AmazonSQSOptions _amazonSQSOptions;

private IAmazonSimpleNotificationService _snsClient;
private IAmazonSQS _sqsClient;
private string _queueUrl = string.Empty;

public AmazonSQSConsumerClient(string groupId, IOptions<AmazonSQSOptions> options)
{
_groupId = groupId;
_amazonSQSOptions = options.Value;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}

Connect(initSNS: true, initSQS: false);

var topicArns = new List<string>();
foreach (var topic in topics)
{
var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws());

var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult();

topicArns.Add(createTopicResponse.TopicArn);
}

Connect(initSNS: false, initSQS: true);

_snsClient.SubscribeQueueToTopicsAsync(topicArns, _sqsClient, _queueUrl)
.GetAwaiter().GetResult();
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();

var request = new ReceiveMessageRequest(_queueUrl)
{
WaitTimeSeconds = 5,
MaxNumberOfMessages = 1
};

while (true)
{
var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult();

if (response.Messages.Count == 1)
{
var messageObj = JsonSerializer.Deserialize<SQSReceivedMessage>(response.Messages[0].Body);

var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value);
var body = messageObj.Message;

var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);

message.Headers.Add(Headers.Group, _groupId);

OnMessageReceived?.Invoke(response.Messages[0].ReceiptHandle, message);
}
else
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
}
}

public void Commit(object sender)
{
try
{
_sqsClient.DeleteMessageAsync(_queueUrl, (string)sender);
}
catch (InvalidIdFormatException ex)
{
InvalidIdFormatLog(ex.Message);
}
}

public void Reject(object sender)
{
try
{
// Visible again in 3 seconds
_sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3);
}
catch (MessageNotInflightException ex)
{
MessageNotInflightLog(ex.Message);
}
}

public void Dispose()
{
_sqsClient?.Dispose();
_snsClient?.Dispose();
}

public void Connect(bool initSNS = true, bool initSQS = true)
{
if (_snsClient != null && _sqsClient != null)
{
return;
}

if (_snsClient == null && initSNS)
{
ConnectionLock.Wait();

try
{
_snsClient = _amazonSQSOptions.Credentials != null
? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
: new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Region);
}
finally
{
ConnectionLock.Release();
}
}

if (_sqsClient == null && initSQS)
{
ConnectionLock.Wait();

try
{

_sqsClient = _amazonSQSOptions.Credentials != null
? new AmazonSQSClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
: new AmazonSQSClient(_amazonSQSOptions.Region);

// If provide the name of an existing queue along with the exact names and values
// of all the queue's attributes, <code>CreateQueue</code> returns the queue URL for
// the existing queue.
_queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl;
}
finally
{
ConnectionLock.Release();
}
}
}

#region private methods

private Task InvalidIdFormatLog(string exceptionMessage)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.InvalidIdFormat,
Reason = exceptionMessage
};

OnLog?.Invoke(null, logArgs);

return Task.CompletedTask;
}

private Task MessageNotInflightLog(string exceptionMessage)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.MessageNotInflight,
Reason = exceptionMessage
};

OnLog?.Invoke(null, logArgs);

return Task.CompletedTask;
}

#endregion
}
}

+ 31
- 0
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs View File

@@ -0,0 +1,31 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<AmazonSQSOptions> _amazonSQSOptions;

public AmazonSQSConsumerClientFactory(IOptions<AmazonSQSOptions> amazonSQSOptions)
{
_amazonSQSOptions = amazonSQSOptions;
}

public IConsumerClient Create(string groupId)
{
try
{
var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions);
return client;
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}

+ 17
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs View File

@@ -0,0 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Amazon;
using Amazon.Runtime;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
// ReSharper disable once InconsistentNaming
public class AmazonSQSOptions
{
public RegionEndpoint Region { get; set; }

public AWSCredentials Credentials { get; set; }
}
}

+ 30
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs View File

@@ -0,0 +1,30 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.AmazonSQS;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal sealed class AmazonSQSOptionsExtension : ICapOptionsExtension
{
private readonly Action<AmazonSQSOptions> _configure;

public AmazonSQSOptionsExtension(Action<AmazonSQSOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
services.Configure(_configure);
services.AddSingleton<ITransport, AmazonSQSTransport>();
services.AddSingleton<IConsumerClientFactory, AmazonSQSConsumerClientFactory>();
}
}
}

+ 30
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs View File

@@ -0,0 +1,30 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Amazon;
using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseAmazonSQS(this CapOptions options, RegionEndpoint region)
{
return options.UseAmazonSQS(opt => { opt.Region = region; });
}

public static CapOptions UseAmazonSQS(this CapOptions options, Action<AmazonSQSOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new AmazonSQSOptionsExtension(configure));

return options;
}
}
}

+ 23
- 0
src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj View File

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.AmazonSQS</AssemblyName>
<PackageTags>$(PackageTags);AmazonSQS;SQS</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.AmazonSQS.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.5.1.20" />
<PackageReference Include="AWSSDK.SQS" Version="3.5.0.49" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>

+ 125
- 0
src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs View File

@@ -0,0 +1,125 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSTransport : ITransport
{
private readonly ILogger _logger;
private readonly IOptions<AmazonSQSOptions> _sqsOptions;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private IAmazonSimpleNotificationService _snsClient;
private IDictionary<string, string> _topicArnMaps;

public AmazonSQSTransport(ILogger<AmazonSQSTransport> logger, IOptions<AmazonSQSOptions> sqsOptions)
{
_logger = logger;
_sqsOptions = sqsOptions;
}

public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", string.Empty);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
try
{
await TryAddTopicArns();

if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn))
{
string bodyJson = null;
if (message.Body != null)
{
bodyJson = Encoding.UTF8.GetString(message.Body);
}

var attributes = message.Headers.Where(x => x.Value != null).ToDictionary(x => x.Key,
x => new MessageAttributeValue
{
StringValue = x.Value,
DataType = "String"
});
var request = new PublishRequest(arn, bodyJson)
{
MessageAttributes = attributes
};

await _snsClient.PublishAsync(request);

_logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published.");
}
else
{
_logger.LogWarning($"Can't be found SNS topics for [{message.GetName().NormalizeForAws()}]");
}
return OperateResult.Success;
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};

return OperateResult.Failed(wrapperEx, errors);
}
}

public async Task<bool> TryAddTopicArns()
{
if (_topicArnMaps != null)
{
return true;
}

await _semaphore.WaitAsync();

try
{
_snsClient = _sqsOptions.Value.Credentials != null
? new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Credentials, _sqsOptions.Value.Region)
: new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Region);

if (_topicArnMaps == null)
{
_topicArnMaps = new Dictionary<string, string>();
var topics = await _snsClient.ListTopicsAsync();
topics.Topics.ForEach(x =>
{
var name = x.TopicArn.Split(':').Last();
_topicArnMaps.Add(name, x.TopicArn);
});

return true;
}
}
catch (Exception e)
{
_logger.LogError(e, "Init topics from aws sns error!");
}
finally
{
_semaphore.Release();
}

return false;
}
}
}

+ 18
- 0
src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs View File

@@ -0,0 +1,18 @@
using System.Collections.Generic;

namespace DotNetCore.CAP.AmazonSQS
{
class SQSReceivedMessage
{
public string Message { get; set; }

public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; }
}

class SQSReceivedMessageAttributes
{
public string Type { get; set; }

public string Value { get; set; }
}
}

+ 16
- 0
src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs View File

@@ -0,0 +1,16 @@
using System;

namespace DotNetCore.CAP.AmazonSQS
{
internal static class TopicNormalizer
{
public static string NormalizeForAws(this string origin)
{
if (origin.Length > 256)
{
throw new ArgumentOutOfRangeException(nameof(origin) + " character string length must between 1~256!");
}
return origin.Replace(".", "-").Replace(":", "_");
}
}
}

+ 4
- 4
src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.AzureServiceBus</AssemblyName>
<PackageTags>$(PackageTags);AzureServiceBus</PackageTags>
</PropertyGroup>
@@ -9,15 +9,15 @@
<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AzureServiceBus.xml</DocumentationFile>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.AzureServiceBus.xml</DocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.1.3" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.1.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</ItemGroup>

</Project>

+ 3
- 2
src/DotNetCore.CAP.AzureServiceBus/ITransport.AzureServiceBus.cs View File

@@ -42,7 +42,8 @@ namespace DotNetCore.CAP.AzureServiceBus
{
MessageId = transportMessage.GetId(),
Body = transportMessage.Body,
Label = transportMessage.GetName()
Label = transportMessage.GetName(),
CorrelationId = transportMessage.GetCorrelationId()
};

foreach (var header in transportMessage.Headers)
@@ -86,4 +87,4 @@ namespace DotNetCore.CAP.AzureServiceBus
}
}
}
}
}

+ 6
- 3
src/DotNetCore.CAP.Dashboard/CAP.DashboardMiddleware.cs View File

@@ -2,12 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Globalization;
using System.Net;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
using DotNetCore.CAP.Dashboard.Resources;
using DotNetCore.CAP.Persistence;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
@@ -36,7 +37,6 @@ namespace DotNetCore.CAP
{
app.UseMiddleware<GatewayProxyMiddleware>();
}

app.UseMiddleware<DashboardMiddleware>();
}

@@ -77,7 +77,7 @@ namespace DotNetCore.CAP
app.UseCapDashboard();

next(app);
};
};
}
}

@@ -106,6 +106,9 @@ namespace DotNetCore.CAP
return;
}

var userLanguages = context.Request.Headers["Accept-Language"].ToString();
Strings.Culture = userLanguages.Contains("zh-") ? new CultureInfo("zh-CN") : new CultureInfo("en-US");

// Update the path
var path = context.Request.Path;
var pathBase = context.Request.PathBase;


+ 2
- 1
src/DotNetCore.CAP.Dashboard/CAP.DashboardOptionsExtensions.cs View File

@@ -6,6 +6,7 @@ using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.GatewayProxy;
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester;
using DotNetCore.CAP.Serialization;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;

@@ -26,7 +27,7 @@ namespace DotNetCore.CAP
_options?.Invoke(dashboardOptions);
services.AddTransient<IStartupFilter, CapStartupFilter>();
services.AddSingleton(dashboardOptions);
services.AddSingleton(DashboardRoutes.Routes);
services.AddSingleton(x => DashboardRoutes.GetDashboardRoutes(x.GetRequiredService<ISerializer>()));
services.AddSingleton<IHttpRequester, HttpClientHttpRequester>();
services.AddSingleton<IHttpClientCache, MemoryHttpClientCache>();
services.AddSingleton<IRequestMapper, RequestMapper>();


+ 8
- 8
src/DotNetCore.CAP.Dashboard/DashboardRoutes.cs View File

@@ -10,7 +10,7 @@ using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.Dashboard
{
public static class DashboardRoutes
public class DashboardRoutes
{
private static readonly string[] Javascripts =
{
@@ -33,9 +33,9 @@ namespace DotNetCore.CAP.Dashboard
"cap.css"
};

static DashboardRoutes()
public static RouteCollection GetDashboardRoutes(ISerializer serializer)
{
Routes = new RouteCollection();
RouteCollection Routes = new RouteCollection();
Routes.AddRazorPage("/", x => new HomePage());
Routes.Add("/stats", new JsonStats());
Routes.Add("/health", new OkStats());
@@ -104,7 +104,7 @@ namespace DotNetCore.CAP.Dashboard
{
var msg = client.Storage.GetMonitoringApi().GetPublishedMessageAsync(messageId)
.GetAwaiter().GetResult();
msg.Origin = StringSerializer.DeSerialize(msg.Content);
msg.Origin = serializer.Deserialize(msg.Content);
client.RequestServices.GetService<IDispatcher>().EnqueueToPublish(msg);
});
Routes.AddPublishBatchCommand(
@@ -113,7 +113,7 @@ namespace DotNetCore.CAP.Dashboard
{
var msg = client.Storage.GetMonitoringApi().GetReceivedMessageAsync(messageId)
.GetAwaiter().GetResult();
msg.Origin = StringSerializer.DeSerialize(msg.Content);
msg.Origin = serializer.Deserialize(msg.Content);
client.RequestServices.GetService<ISubscribeDispatcher>().DispatchAsync(msg);
});

@@ -128,16 +128,16 @@ namespace DotNetCore.CAP.Dashboard
Routes.AddRazorPage("/nodes", x =>
{
var id = x.Request.Cookies["cap.node"];
var id = x.Request.Cookies.ContainsKey("cap.node") ? x.Request.Cookies["cap.node"] : string.Empty;
return new NodePage(id);
});

Routes.AddRazorPage("/nodes/node/(?<Id>.+)", x => new NodePage(x.UriMatch.Groups["Id"].Value));

#endregion Razor pages and commands
}

public static RouteCollection Routes { get; }
return Routes;
}

internal static string GetContentFolderNamespace(string contentFolder)
{


+ 2
- 4
src/DotNetCore.CAP.Dashboard/DotNetCore.CAP.Dashboard.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<NoWarn>1591</NoWarn>
</PropertyGroup>

@@ -40,8 +40,6 @@
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Consul" Version="1.6.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.5" />
</ItemGroup>

<ItemGroup>
@@ -232,6 +230,6 @@
<Generator>RazorGenerator</Generator>
<LastGenOutput>_SidebarMenu.generated.cs</LastGenOutput>
</None>
</ItemGroup>
</ItemGroup>

</Project>

+ 4
- 17
src/DotNetCore.CAP.Dashboard/JsonDispatcher.cs View File

@@ -2,10 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using System.Text.Json;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Dashboard
{
@@ -30,19 +28,8 @@ namespace DotNetCore.CAP.Dashboard
if (_command != null)
{
var result = _command(context);

var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[]
{
new StringEnumConverter
{
NamingStrategy = new CamelCaseNamingStrategy()
}
}
};
serialized = JsonConvert.SerializeObject(result, settings);
serialized = JsonSerializer.Serialize(result);
}

if (_jsonCommand != null)


+ 3
- 16
src/DotNetCore.CAP.Dashboard/JsonStats.cs View File

@@ -3,10 +3,8 @@

using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;

namespace DotNetCore.CAP.Dashboard
{
@@ -26,19 +24,8 @@ namespace DotNetCore.CAP.Dashboard
var value = metric.Func(page);
result.Add(metric.Name, value);
}

var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = new JsonConverter[]
{
new StringEnumConverter
{
NamingStrategy = new CamelCaseNamingStrategy()
}
}
};
var serialized = JsonConvert.SerializeObject(result, settings);
var serialized = JsonSerializer.Serialize(result);

context.Response.ContentType = "application/json";
await context.Response.WriteAsync(serialized);


+ 6
- 0
src/DotNetCore.CAP.Dashboard/NodeDiscovery/CAP.DiscoveryOptions.cs View File

@@ -13,6 +13,8 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery

public const string DefaultMatchPath = "/cap";

public const string DefaultScheme = "http";

public DiscoveryOptions()
{
DiscoveryServerHostName = DefaultDiscoveryServerHost;
@@ -22,6 +24,8 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
CurrentNodePort = DefaultCurrentNodePort;

MatchPath = DefaultMatchPath;

Scheme = DefaultScheme;
}

public string DiscoveryServerHostName { get; set; }
@@ -34,5 +38,7 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
public string NodeName { get; set; }

public string MatchPath { get; set; }

public string Scheme { get; set; }
}
}

+ 14
- 9
src/DotNetCore.CAP.Dashboard/NodeDiscovery/INodeDiscoveryProvider.Consul.cs View File

@@ -69,21 +69,26 @@ namespace DotNetCore.CAP.Dashboard.NodeDiscovery
{
try
{
var healthCheck = new AgentServiceCheck
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30),
Interval = TimeSpan.FromSeconds(10),
Status = HealthStatus.Passing
};

if (_options.Scheme.Equals("http", StringComparison.OrdinalIgnoreCase))
healthCheck.HTTP = $"http://{_options.CurrentNodeHostName}:{_options.CurrentNodePort}{_options.MatchPath}/health";
else if (_options.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase))
healthCheck.TCP = $"{_options.CurrentNodeHostName}:{_options.CurrentNodePort}";

return _consul.Agent.ServiceRegister(new AgentServiceRegistration
{
ID = _options.NodeId,
Name = _options.NodeName,
Address = _options.CurrentNodeHostName,
Port = _options.CurrentNodePort,
Tags = new[] {"CAP", "Client", "Dashboard"},
Check = new AgentServiceCheck
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30),
Interval = TimeSpan.FromSeconds(10),
Status = HealthStatus.Passing,
HTTP =
$"http://{_options.CurrentNodeHostName}:{_options.CurrentNodePort}{_options.MatchPath}/health"
}
Tags = new[] { "CAP", "Client", "Dashboard" },
Check = healthCheck
});
}
catch (Exception ex)


+ 5
- 5
src/DotNetCore.CAP.Dashboard/Pages/HomePage.cshtml View File

@@ -1,8 +1,8 @@
@* Generator: Template TypeVisibility: Internal GeneratePrettyNames: True *@
@using System.Text.Json
@using DotNetCore.CAP.Dashboard.Pages
@using DotNetCore.CAP.Dashboard.Resources
@using DotNetCore.CAP.Messages
@using Newtonsoft.Json
@inherits DotNetCore.CAP.Dashboard.RazorPage
@{
Layout = new LayoutPage(Strings.HomePage_Title);
@@ -52,12 +52,12 @@
</h3>

<div id="historyGraph"
data-published-succeeded="@JsonConvert.SerializeObject(publishedSucceeded)"
data-published-failed="@JsonConvert.SerializeObject(publishedFailed)"
data-published-succeeded="@JsonSerializer.Serialize(publishedSucceeded)"
data-published-failed="@JsonSerializer.Serialize(publishedFailed)"
data-published-succeeded-string="@Strings.HomePage_GraphHover_PSucceeded"
data-published-failed-string="@Strings.HomePage_GraphHover_PFailed"
data-received-succeeded="@JsonConvert.SerializeObject(receivedSucceeded)"
data-received-failed="@JsonConvert.SerializeObject(receivedFailed)"
data-received-succeeded="@JsonSerializer.Serialize(receivedSucceeded)"
data-received-failed="@JsonSerializer.Serialize(receivedFailed)"
data-received-succeeded-string="@Strings.HomePage_GraphHover_RSucceeded"
data-received-failed-string="@Strings.HomePage_GraphHover_RFailed">
</div>


+ 5
- 5
src/DotNetCore.CAP.Dashboard/Pages/HomePage.generated.cs View File

@@ -35,7 +35,7 @@ namespace DotNetCore.CAP.Dashboard.Pages
#line hidden
#line 5 "..\..\Pages\HomePage.cshtml"
using Newtonsoft.Json;
using System.Text.Json;
#line default
#line hidden
@@ -252,7 +252,7 @@ WriteLiteral("\r\n </h3>\r\n\r\n <div id=\"historyGraph\"\r\n

#line 55 "..\..\Pages\HomePage.cshtml"
Write(JsonConvert.SerializeObject(publishedSucceeded));
Write(JsonSerializer.Serialize(publishedSucceeded));

#line default
@@ -262,7 +262,7 @@ WriteLiteral("\"\r\n data-published-failed=\"");

#line 56 "..\..\Pages\HomePage.cshtml"
Write(JsonConvert.SerializeObject(publishedFailed));
Write(JsonSerializer.Serialize(publishedFailed));

#line default
@@ -292,7 +292,7 @@ WriteLiteral("\"\r\n data-received-succeeded=\"");

#line 59 "..\..\Pages\HomePage.cshtml"
Write(JsonConvert.SerializeObject(receivedSucceeded));
Write(JsonSerializer.Serialize(receivedSucceeded));

#line default
@@ -302,7 +302,7 @@ WriteLiteral("\"\r\n data-received-failed=\"");

#line 60 "..\..\Pages\HomePage.cshtml"
Write(JsonConvert.SerializeObject(receivedFailed));
Write(JsonSerializer.Serialize(receivedFailed));

#line default


+ 1
- 1
src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml View File

@@ -45,7 +45,7 @@
{
<td rowspan="@rowCount">@subscriber.Key</td>
}
<td>@column.Attribute.Name</td>
<td>@column.TopicName</td>
<td>
<span style="color: #00bcd4">@column.ImplTypeInfo.Name</span>:
<div class="job-snippet-code">


+ 1
- 1
src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs View File

@@ -200,7 +200,7 @@ WriteLiteral(" <td>");

#line 48 "..\..\Pages\SubscriberPage.cshtml"
Write(column.Attribute.Name);
Write(column.TopicName);

#line default


+ 2
- 2
src/DotNetCore.CAP.InMemoryStorage/DotNetCore.CAP.InMemoryStorage.csproj View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.InMemoryStorage</AssemblyName>
<PackageTags>$(PackageTags);InMemory</PackageTags>
</PropertyGroup>
@@ -9,5 +9,5 @@
<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
</Project>

+ 25
- 13
src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs View File

@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -19,20 +18,23 @@ namespace DotNetCore.CAP.InMemoryStorage
internal class InMemoryStorage : IDataStorage
{
private readonly IOptions<CapOptions> _capOptions;
private readonly ISerializer _serializer;

public InMemoryStorage(IOptions<CapOptions> capOptions)
public InMemoryStorage(IOptions<CapOptions> capOptions, ISerializer serializer)
{
_capOptions = capOptions;
_serializer = serializer;
}

public static ConcurrentDictionary<string, MemoryMessage> PublishedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>();
public static Dictionary<string, MemoryMessage> PublishedMessages { get; } = new Dictionary<string, MemoryMessage>();

public static ConcurrentDictionary<string, MemoryMessage> ReceivedMessages { get; } = new ConcurrentDictionary<string, MemoryMessage>();
public static Dictionary<string, MemoryMessage> ReceivedMessages { get; } = new Dictionary<string, MemoryMessage>();

public Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
PublishedMessages[message.DbId].StatusName = state;
PublishedMessages[message.DbId].ExpiresAt = message.ExpiresAt;
PublishedMessages[message.DbId].Content = _serializer.Serialize(message.Origin);
return Task.CompletedTask;
}

@@ -40,6 +42,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{
ReceivedMessages[message.DbId].StatusName = state;
ReceivedMessages[message.DbId].ExpiresAt = message.ExpiresAt;
ReceivedMessages[message.DbId].Content = _serializer.Serialize(message.Origin);
return Task.CompletedTask;
}

@@ -49,7 +52,7 @@ namespace DotNetCore.CAP.InMemoryStorage
{
DbId = content.GetId(),
Origin = content,
Content = StringSerializer.Serialize(content),
Content = _serializer.Serialize(content),
Added = DateTime.Now,
ExpiresAt = null,
Retries = 0
@@ -104,7 +107,7 @@ namespace DotNetCore.CAP.InMemoryStorage
Origin = mdMessage.Origin,
Group = group,
Name = name,
Content = StringSerializer.Serialize(mdMessage.Origin),
Content = _serializer.Serialize(mdMessage.Origin),
Retries = mdMessage.Retries,
Added = mdMessage.Added,
ExpiresAt = mdMessage.ExpiresAt,
@@ -118,10 +121,14 @@ namespace DotNetCore.CAP.InMemoryStorage
var removed = 0;
if (table == nameof(PublishedMessages))
{
var ids = PublishedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList();
var ids = PublishedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);
foreach (var id in ids)
{
if (PublishedMessages.TryRemove(id, out _))
if (PublishedMessages.Remove(id))
{
removed++;
}
@@ -129,15 +136,20 @@ namespace DotNetCore.CAP.InMemoryStorage
}
else
{
var ids = ReceivedMessages.Values.Where(x => x.ExpiresAt < timeout).Select(x => x.DbId).ToList();
var ids = ReceivedMessages.Values
.Where(x => x.ExpiresAt < timeout)
.Select(x => x.DbId)
.Take(batchCount);

foreach (var id in ids)
{
if (PublishedMessages.TryRemove(id, out _))
if (ReceivedMessages.Remove(id))
{
removed++;
}
}
}
}

return Task.FromResult(removed);
}

@@ -152,7 +164,7 @@ namespace DotNetCore.CAP.InMemoryStorage

foreach (var message in ret)
{
message.Origin = StringSerializer.DeSerialize(message.Content);
message.Origin = _serializer.Deserialize(message.Content);
}

return Task.FromResult(ret);
@@ -169,7 +181,7 @@ namespace DotNetCore.CAP.InMemoryStorage

foreach (var message in ret)
{
message.Origin = StringSerializer.DeSerialize(message.Content);
message.Origin = _serializer.Deserialize(message.Content);
}

return Task.FromResult(ret);


Some files were not shown because too many files changed in this diff

Loading…
Cancel
Save