From daf4efbf50b708c91f054297f9fd396d08f3b4c4 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Mon, 20 Jul 2020 23:21:13 +0800 Subject: [PATCH 01/11] Fix amazon sqs reject message bug --- .../AmazonSQSConsumerClient.cs | 50 ++++++++++++++++++- .../TopicNormalizer.cs | 2 +- .../Internal/IConsumerRegister.Default.cs | 6 +++ src/DotNetCore.CAP/Transport/MqLogType.cs | 6 ++- 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs index 2bd6ee4..75b933a 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; +using System.Threading.Tasks; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; using Amazon.SQS; @@ -103,12 +104,27 @@ namespace DotNetCore.CAP.AmazonSQS public void Commit(object sender) { - _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender); + try + { + _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender); + } + catch (InvalidIdFormatException ex) + { + InvalidIdFormatLog(ex.Message); + } } public void Reject(object sender) { - _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000); + try + { + // Visible again in 3 seconds + _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3); + } + catch (MessageNotInflightException ex) + { + MessageNotInflightLog(ex.Message); + } } public void Dispose() @@ -162,5 +178,35 @@ namespace DotNetCore.CAP.AmazonSQS } } } + + #region private methods + + private Task InvalidIdFormatLog(string exceptionMessage) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.InvalidIdFormat, + Reason = exceptionMessage + }; + + OnLog?.Invoke(null, logArgs); + + return Task.CompletedTask; + } + + private Task MessageNotInflightLog(string exceptionMessage) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.MessageNotInflight, + Reason = exceptionMessage + }; + + OnLog?.Invoke(null, logArgs); + + return Task.CompletedTask; + } + + #endregion } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs index f08c481..622fc2e 100644 --- a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs +++ b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs @@ -2,7 +2,7 @@ namespace DotNetCore.CAP.AmazonSQS { - public static class TopicNormalizer + internal static class TopicNormalizer { public static string NormalizeForAws(this string origin) { diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 7e32a68..65cb999 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -271,6 +271,12 @@ namespace DotNetCore.CAP.Internal case MqLogType.ExceptionReceived: _logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason); break; + case MqLogType.InvalidIdFormat: + _logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason); + break; + case MqLogType.MessageNotInflight: + _logger.LogError("AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason); + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index 3412b3a..659e004 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -18,7 +18,11 @@ namespace DotNetCore.CAP.Transport ServerConnError, //AzureServiceBus - ExceptionReceived + ExceptionReceived, + + //Amazon SQS + InvalidIdFormat, + MessageNotInflight } public class LogMessageEventArgs : EventArgs From ae471b4cae14ccad55f9177c37c5c5284aae1781 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Mon, 20 Jul 2020 23:21:46 +0800 Subject: [PATCH 02/11] Add AmazonSQS samples --- CAP.sln | 14 +++++++------- .../Controllers/ValuesController.cs | 8 ++++---- .../Program.cs | 2 +- .../Sample.AmazonSQS.InMemory.csproj} | 2 +- .../Startup.cs | 7 ++++--- .../appsettings.json | 2 +- 6 files changed, 18 insertions(+), 17 deletions(-) rename samples/{Sample.Kafka.InMemory => Sample.AmazonSQS.InMemory}/Controllers/ValuesController.cs (72%) rename samples/{Sample.Kafka.InMemory => Sample.AmazonSQS.InMemory}/Program.cs (93%) rename samples/{Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj => Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj} (82%) rename samples/{Sample.Kafka.InMemory => Sample.AmazonSQS.InMemory}/Startup.cs (79%) rename samples/{Sample.Kafka.InMemory => Sample.AmazonSQS.InMemory}/appsettings.json (75%) diff --git a/CAP.sln b/CAP.sln index c494fab..6e4e4ec 100644 --- a/CAP.sln +++ b/CAP.sln @@ -51,8 +51,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureService EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Dashboard", "src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj", "{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.InMemory", "samples\Sample.Kafka.InMemory\Sample.Kafka.InMemory.csproj", "{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{F6C5C676-AF05-46D5-A45D-442137B31898}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.PostgreSql", "samples\Sample.Kafka.PostgreSql\Sample.Kafka.PostgreSql.csproj", "{F1EF1D26-8A6B-403E-85B0-250DF44A4A7C}" @@ -67,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "sample EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory", "samples\Sample.AmazonSQS.InMemory\Sample.AmazonSQS.InMemory.csproj", "{B187DD15-092D-4B72-9807-50856607D237}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -121,10 +121,6 @@ Global {56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Debug|Any CPU.Build.0 = Debug|Any CPU {56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.ActiveCfg = Release|Any CPU {56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.Build.0 = Release|Any CPU - {1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.Build.0 = Release|Any CPU {F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.Build.0 = Debug|Any CPU {F6C5C676-AF05-46D5-A45D-442137B31898}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -153,6 +149,10 @@ Global {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU + {B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -170,7 +170,6 @@ Global {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} {63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {56FB261C-67AF-4715-9A46-4FA4FAB91B2C} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} - {1B0371D6-36A4-4C78-A727-8ED732FDBA1D} = {3A6B6931-A123-477A-9469-8B468B5385AF} {F6C5C676-AF05-46D5-A45D-442137B31898} = {3A6B6931-A123-477A-9469-8B468B5385AF} {F1EF1D26-8A6B-403E-85B0-250DF44A4A7C} = {3A6B6931-A123-477A-9469-8B468B5385AF} {F8EF381A-FE83-40B3-A63D-09D83851B0FB} = {10C0818D-9160-4B80-BB86-DDE925B64D43} @@ -178,6 +177,7 @@ Global {75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF} {43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Sample.Kafka.InMemory/Controllers/ValuesController.cs b/samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs similarity index 72% rename from samples/Sample.Kafka.InMemory/Controllers/ValuesController.cs rename to samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs index b382b41..e557d7e 100644 --- a/samples/Sample.Kafka.InMemory/Controllers/ValuesController.cs +++ b/samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; -namespace Sample.Kafka.InMemory.Controllers +namespace Sample.AmazonSQS.InMemory.Controllers { [Route("api/[controller]")] public class ValuesController : Controller, ICapSubscribe @@ -18,13 +18,13 @@ namespace Sample.Kafka.InMemory.Controllers [Route("~/without/transaction")] public async Task WithoutTransaction() { - await _capBus.PublishAsync("sample.azure.mysql2", DateTime.Now); + await _capBus.PublishAsync("sample.aws.in-memory", DateTime.Now); return Ok(); } - [CapSubscribe("sample.azure.mysql2")] - public void Test2T2(DateTime value) + [CapSubscribe("sample.aws.in-memory")] + public void SubscribeInMemoryTopic(DateTime value) { Console.WriteLine("Subscriber output message: " + value); } diff --git a/samples/Sample.Kafka.InMemory/Program.cs b/samples/Sample.AmazonSQS.InMemory/Program.cs similarity index 93% rename from samples/Sample.Kafka.InMemory/Program.cs rename to samples/Sample.AmazonSQS.InMemory/Program.cs index b0af402..5767a5e 100644 --- a/samples/Sample.Kafka.InMemory/Program.cs +++ b/samples/Sample.AmazonSQS.InMemory/Program.cs @@ -1,7 +1,7 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Hosting; -namespace Sample.Kafka.InMemory +namespace Sample.AmazonSQS.InMemory { public class Program { diff --git a/samples/Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj b/samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj similarity index 82% rename from samples/Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj rename to samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj index 0cde14f..c8ed0b3 100644 --- a/samples/Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj +++ b/samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj @@ -5,9 +5,9 @@ + - diff --git a/samples/Sample.Kafka.InMemory/Startup.cs b/samples/Sample.AmazonSQS.InMemory/Startup.cs similarity index 79% rename from samples/Sample.Kafka.InMemory/Startup.cs rename to samples/Sample.AmazonSQS.InMemory/Startup.cs index ae3ee2a..f01411c 100644 --- a/samples/Sample.Kafka.InMemory/Startup.cs +++ b/samples/Sample.AmazonSQS.InMemory/Startup.cs @@ -1,7 +1,8 @@ -using Microsoft.AspNetCore.Builder; +using Amazon; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; -namespace Sample.Kafka.InMemory +namespace Sample.AmazonSQS.InMemory { public class Startup { @@ -10,7 +11,7 @@ namespace Sample.Kafka.InMemory services.AddCap(x => { x.UseInMemoryStorage(); - x.UseKafka("localhost:9092"); + x.UseAmazonSQS(RegionEndpoint.CNNorthWest1); x.UseDashboard(); }); diff --git a/samples/Sample.Kafka.InMemory/appsettings.json b/samples/Sample.AmazonSQS.InMemory/appsettings.json similarity index 75% rename from samples/Sample.Kafka.InMemory/appsettings.json rename to samples/Sample.AmazonSQS.InMemory/appsettings.json index 20aa907..50fe9a3 100644 --- a/samples/Sample.Kafka.InMemory/appsettings.json +++ b/samples/Sample.AmazonSQS.InMemory/appsettings.json @@ -2,7 +2,7 @@ "Logging": { "IncludeScopes": false, "LogLevel": { - "Default": "Debug" + "Default": "Error" } } } From 3f2814f186371427a4ed47a8763e3b721520a76c Mon Sep 17 00:00:00 2001 From: Shane Rogers Date: Thu, 23 Jul 2020 16:38:57 +1200 Subject: [PATCH 03/11] Updated mongo db query. (#611) Updated query as it referenced the wrong collection as a result after 3 retries the old messages are never processed (<4min) --- src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs index e3e9df1..9275b31 100644 --- a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -27,8 +27,7 @@ namespace DotNetCore.CAP.MongoDB public MongoDBDataStorage( IOptions capOptions, IOptions options, - IMongoClient client, - ILogger logger) + IMongoClient client) { _capOptions = capOptions; _options = options; @@ -194,7 +193,7 @@ namespace DotNetCore.CAP.MongoDB public async Task> GetReceivedMessagesOfNeedRetry() { var fourMinAgo = DateTime.Now.AddMinutes(-4); - var collection = _database.GetCollection(_options.Value.PublishedCollection); + var collection = _database.GetCollection(_options.Value.ReceivedCollection); var queryResult = await collection .Find(x => x.Retries < _capOptions.Value.FailedRetryCount && x.Added < fourMinAgo @@ -217,4 +216,4 @@ namespace DotNetCore.CAP.MongoDB return new MongoDBMonitoringApi(_client, _options); } } -} \ No newline at end of file +} From 847899342e94b3e9904cd42c5c026c468c2de6eb Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 29 Jul 2020 16:36:59 +0800 Subject: [PATCH 04/11] Add file license header --- ...ConnectionExtensions.cs => IDbConnection.Extensions.cs} | 7 +++++-- ...ConnectionExtensions.cs => IDbConnection.Extensions.cs} | 7 +++++-- ...ConnectionExtensions.cs => IDbConnection.Extensions.cs} | 7 +++++-- 3 files changed, 15 insertions(+), 6 deletions(-) rename src/DotNetCore.CAP.MySql/{IDbConnectionExtensions.cs => IDbConnection.Extensions.cs} (92%) rename src/DotNetCore.CAP.PostgreSql/{IDbConnectionExtensions.cs => IDbConnection.Extensions.cs} (92%) rename src/DotNetCore.CAP.SqlServer/{IDbConnectionExtensions.cs => IDbConnection.Extensions.cs} (92%) diff --git a/src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs b/src/DotNetCore.CAP.MySql/IDbConnection.Extensions.cs similarity index 92% rename from src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs rename to src/DotNetCore.CAP.MySql/IDbConnection.Extensions.cs index e75527f..f203025 100644 --- a/src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs +++ b/src/DotNetCore.CAP.MySql/IDbConnection.Extensions.cs @@ -1,10 +1,13 @@ -using System; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; using System.ComponentModel; using System.Data; namespace DotNetCore.CAP.MySql { - internal static class IDbConnectionExtensions + internal static class DbConnectionExtensions { public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, params object[] sqlParams) diff --git a/src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs b/src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs similarity index 92% rename from src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs rename to src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs index b94f7f4..0a5ac1f 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs @@ -1,10 +1,13 @@ -using System; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; using System.ComponentModel; using System.Data; namespace DotNetCore.CAP.PostgreSql { - internal static class IDbConnectionExtensions + internal static class DbConnectionExtensions { public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, params object[] sqlParams) diff --git a/src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs b/src/DotNetCore.CAP.SqlServer/IDbConnection.Extensions.cs similarity index 92% rename from src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs rename to src/DotNetCore.CAP.SqlServer/IDbConnection.Extensions.cs index 0853ff3..ba7dbce 100644 --- a/src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs +++ b/src/DotNetCore.CAP.SqlServer/IDbConnection.Extensions.cs @@ -1,10 +1,13 @@ -using System; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; using System.ComponentModel; using System.Data; namespace DotNetCore.CAP.SqlServer { - internal static class IDbConnectionExtensions + internal static class DbConnectionExtensions { public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, params object[] sqlParams) From 0d603a7bdeb0eb605027430615479155a3cbbf95 Mon Sep 17 00:00:00 2001 From: Pascal Slegtenhorst Date: Thu, 30 Jul 2020 03:18:18 +0200 Subject: [PATCH 05/11] partial topic attributes (#617) * Added support for defining topic attribute partials * Updated readme * Small improvements to partial topic implementation. Co-authored-by: Pascal Slegtenhorst --- README.md | 15 +++++++ .../Pages/SubscriberPage.cshtml | 2 +- .../Pages/SubscriberPage.generated.cs | 2 +- src/DotNetCore.CAP/CAP.Attribute.cs | 14 +++---- .../Internal/ConsumerExecutorDescriptor.cs | 26 ++++++++++++ .../Internal/IConsumerRegister.Default.cs | 2 +- .../IConsumerServiceSelector.Default.cs | 40 ++++++++++--------- src/DotNetCore.CAP/Internal/TopicAttribute.cs | 10 ++++- .../ConsumerServiceSelectorTest.cs | 28 ++++++++++--- 9 files changed, 104 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 66586f3..540b68d 100644 --- a/README.md +++ b/README.md @@ -187,6 +187,21 @@ public void ConfigureServices(IServiceCollection services) }); } ``` +#### Use partials for topic subscriptions + +To group topic subscriptions on class level you're able to define a subscription on a method as a partial. Subscriptions on the message queue will then be a combination of the topic defined on the class and the topic defined on the method. In the following example the `Create(..)` function will be invoked when receiving a message on `customers.create` + +```c# +[CapSubscribe("customers")] +public class CustomersSubscriberService : ICapSubscribe +{ + [CapSubscribe("create", isPartial: true)] + public void Create(Customer customer) + { + } +} +``` + #### Subscribe Group diff --git a/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml b/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml index 0b6ba0d..3bd479e 100644 --- a/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml +++ b/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml @@ -45,7 +45,7 @@ { @subscriber.Key } - @column.Attribute.Name + @column.TopicName @column.ImplTypeInfo.Name
diff --git a/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs b/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs index b243943..3f05369 100644 --- a/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs +++ b/src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs @@ -200,7 +200,7 @@ WriteLiteral(" "); #line 48 "..\..\Pages\SubscriberPage.cshtml" - Write(column.Attribute.Name); + Write(column.TopicName); #line default diff --git a/src/DotNetCore.CAP/CAP.Attribute.cs b/src/DotNetCore.CAP/CAP.Attribute.cs index 40092a5..d77df33 100644 --- a/src/DotNetCore.CAP/CAP.Attribute.cs +++ b/src/DotNetCore.CAP/CAP.Attribute.cs @@ -10,13 +10,13 @@ using DotNetCore.CAP.Internal; namespace DotNetCore.CAP { public class CapSubscribeAttribute : TopicAttribute - { - public CapSubscribeAttribute(string name) - : base(name) - { - - } - + { + public CapSubscribeAttribute(string name, bool isPartial = false) + : base(name, isPartial) + { + + } + public override string ToString() { return Name; diff --git a/src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs b/src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs index b7a0f8f..7a5cc0a 100644 --- a/src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs +++ b/src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs @@ -20,7 +20,33 @@ namespace DotNetCore.CAP.Internal public TopicAttribute Attribute { get; set; } + public TopicAttribute ClassAttribute { get; set; } + public IList Parameters { get; set; } + + private string _topicName; + /// + /// Topic name based on both and . + /// + public string TopicName + { + get + { + if (_topicName == null) + { + if (ClassAttribute != null && Attribute.IsPartial) + { + // Allows class level attribute name to end with a '.' and allows methods level attribute to start with a '.'. + _topicName = $"{ClassAttribute.Name.TrimEnd('.')}.{Attribute.Name.TrimStart('.')}"; + } + else + { + _topicName = Attribute.Name; + } + } + return _topicName; + } + } } public class ParameterDescriptor diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 65cb999..c41c96d 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -79,7 +79,7 @@ namespace DotNetCore.CAP.Internal RegisterMessageProcessor(client); - client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name)); + client.Subscribe(matchGroup.Value.Select(x => x.TopicName)); client.Listening(_pollingDelay, _cts.Token); } diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index f77ed53..f8e278c 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -116,17 +116,24 @@ namespace DotNetCore.CAP.Internal protected IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null) { + var topicClassAttribute = typeInfo.GetCustomAttribute(true); + foreach (var method in typeInfo.DeclaredMethods) { - var topicAttr = method.GetCustomAttributes(true); - var topicAttributes = topicAttr as IList ?? topicAttr.ToList(); + var topicMethodAttributes = method.GetCustomAttributes(true); + + // Ignore partial attributes when no topic attribute is defined on class. + if (topicClassAttribute is null) + { + topicMethodAttributes = topicMethodAttributes.Where(x => x.IsPartial); + } - if (!topicAttributes.Any()) + if (!topicMethodAttributes.Any()) { continue; } - foreach (var attr in topicAttributes) + foreach (var attr in topicMethodAttributes) { SetSubscribeAttribute(attr); @@ -138,21 +145,14 @@ namespace DotNetCore.CAP.Internal IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any() }).ToList(); - yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters); + yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters, topicClassAttribute); } } } protected virtual void SetSubscribeAttribute(TopicAttribute attribute) { - if (attribute.Group == null) - { - attribute.Group = _capOptions.DefaultGroup + "." + _capOptions.Version; - } - else - { - attribute.Group = attribute.Group + "." + _capOptions.Version; - } + attribute.Group = (attribute.Group ?? _capOptions.DefaultGroup) + "." + _capOptions.Version; } private static ConsumerExecutorDescriptor InitDescriptor( @@ -160,11 +160,13 @@ namespace DotNetCore.CAP.Internal MethodInfo methodInfo, TypeInfo implType, TypeInfo serviceTypeInfo, - IList parameters) + IList parameters, + TopicAttribute classAttr = null) { var descriptor = new ConsumerExecutorDescriptor { Attribute = attr, + ClassAttribute = classAttr, MethodInfo = methodInfo, ImplTypeInfo = implType, ServiceTypeInfo = serviceTypeInfo, @@ -176,7 +178,7 @@ namespace DotNetCore.CAP.Internal private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList executeDescriptor) { - return executeDescriptor.FirstOrDefault(x => x.Attribute.Name.Equals(key, StringComparison.InvariantCultureIgnoreCase)); + return executeDescriptor.FirstOrDefault(x => x.TopicName.Equals(key, StringComparison.InvariantCultureIgnoreCase)); } private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList executeDescriptor) @@ -184,10 +186,10 @@ namespace DotNetCore.CAP.Internal var group = executeDescriptor.First().Attribute.Group; if (!_asteriskList.TryGetValue(group, out var tmpList)) { - tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0) + tmpList = executeDescriptor.Where(x => x.TopicName.IndexOf('*') >= 0) .Select(x => new RegexExecuteDescriptor { - Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."), + Name = ("^" + x.TopicName + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."), Descriptor = x }).ToList(); _asteriskList.TryAdd(group, tmpList); @@ -210,10 +212,10 @@ namespace DotNetCore.CAP.Internal if (!_poundList.TryGetValue(group, out var tmpList)) { tmpList = executeDescriptor - .Where(x => x.Attribute.Name.IndexOf('#') >= 0) + .Where(x => x.TopicName.IndexOf('#') >= 0) .Select(x => new RegexExecuteDescriptor { - Name = ("^" + x.Attribute.Name.Replace(".", "\\.") + "$").Replace("#", "[0-9_a-zA-Z\\.]+"), + Name = ("^" + x.TopicName.Replace(".", "\\.") + "$").Replace("#", "[0-9_a-zA-Z\\.]+"), Descriptor = x }).ToList(); _poundList.TryAdd(group, tmpList); diff --git a/src/DotNetCore.CAP/Internal/TopicAttribute.cs b/src/DotNetCore.CAP/Internal/TopicAttribute.cs index 559cd5f..9c0fca9 100644 --- a/src/DotNetCore.CAP/Internal/TopicAttribute.cs +++ b/src/DotNetCore.CAP/Internal/TopicAttribute.cs @@ -12,9 +12,10 @@ namespace DotNetCore.CAP.Internal [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple = true)] public abstract class TopicAttribute : Attribute { - protected TopicAttribute(string name) + protected TopicAttribute(string name, bool isPartial = false) { Name = name; + IsPartial = isPartial; } /// @@ -22,6 +23,13 @@ namespace DotNetCore.CAP.Internal /// public string Name { get; } + /// + /// Defines wether this attribute defines a topic subscription partial. + /// The defined topic will be combined with a topic subscription defined on class level, + /// which results for example in subscription on "class.method". + /// + public bool IsPartial { get; } + /// /// Default group name is CapOptions setting.(Assembly name) /// kafka --> groups.id diff --git a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs index 59f231d..bb9dcdb 100644 --- a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs +++ b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs @@ -29,15 +29,18 @@ namespace DotNetCore.CAP.Test var selector = _provider.GetRequiredService(); var candidates = selector.SelectCandidates(); - Assert.Equal(6, candidates.Count); + Assert.Equal(8, candidates.Count); } - [Fact] - public void CanFindSpecifiedTopic() + [Theory] + [InlineData("Candidates.Foo")] + [InlineData("Candidates.Foo3")] + [InlineData("Candidates.Foo4")] + public void CanFindSpecifiedTopic(string topic) { var selector = _provider.GetRequiredService(); var candidates = selector.SelectCandidates(); - var bestCandidates = selector.SelectBestCandidate("Candidates.Foo", candidates); + var bestCandidates = selector.SelectBestCandidate(topic, candidates); Assert.NotNull(bestCandidates); Assert.NotNull(bestCandidates.MethodInfo); @@ -116,7 +119,7 @@ namespace DotNetCore.CAP.Test public class CandidatesTopic : TopicAttribute { - public CandidatesTopic(string topicName) : base(topicName) + public CandidatesTopic(string topicName, bool isPartial = false) : base(topicName, isPartial) { } } @@ -129,6 +132,7 @@ namespace DotNetCore.CAP.Test { } + [CandidatesTopic("Candidates")] public class CandidatesFooTest : IFooTest, ICapSubscribe { [CandidatesTopic("Candidates.Foo")] @@ -144,6 +148,20 @@ namespace DotNetCore.CAP.Test Console.WriteLine("GetFoo2() method has bee excuted."); } + [CandidatesTopic("Foo3", isPartial: true)] + public Task GetFoo3() + { + Console.WriteLine("GetFoo3() method has bee excuted."); + return Task.CompletedTask; + } + + [CandidatesTopic(".Foo4", isPartial: true)] + public Task GetFoo4() + { + Console.WriteLine("GetFoo4() method has bee excuted."); + return Task.CompletedTask; + } + [CandidatesTopic("*.*.Asterisk")] [CandidatesTopic("*.Asterisk")] public void GetFooAsterisk() From 7e5210ed6a32ce81155c4eb88752ea21e39e6fa5 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 31 Jul 2020 20:19:10 +0800 Subject: [PATCH 06/11] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 540b68d..f1bfd74 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@

-# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md) +# CAP                     [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md) [![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP) [![AppVeyor](https://ci.appveyor.com/api/projects/status/v8gfh6pe2u2laqoa/branch/master?svg=true)](https://ci.appveyor.com/project/yang-xiaodong/cap/branch/master) [![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/) From 92eed9e7d5d33b314a111e45593ebfec7e486b2c Mon Sep 17 00:00:00 2001 From: Johan Date: Tue, 4 Aug 2020 18:33:26 +0800 Subject: [PATCH 07/11] Update README.zh-cn.md (#620) --- README.zh-cn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.zh-cn.md b/README.zh-cn.md index f0e771f..b5efb90 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -174,7 +174,7 @@ namespace xxx.Service { public interface ISubscriberService { - public void CheckReceivedMessage(DateTime datetime); + void CheckReceivedMessage(DateTime datetime); } public class SubscriberService: ISubscriberService, ICapSubscribe From f773714574e8c0800362832c862599f13e2ed14b Mon Sep 17 00:00:00 2001 From: Johan Date: Wed, 5 Aug 2020 18:08:16 +0800 Subject: [PATCH 08/11] Update README.md (#621) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f1bfd74..7949dd2 100644 --- a/README.md +++ b/README.md @@ -159,7 +159,7 @@ namespace BusinessCode.Service { public interface ISubscriberService { - public void CheckReceivedMessage(DateTime datetime); + void CheckReceivedMessage(DateTime datetime); } public class SubscriberService: ISubscriberService, ICapSubscribe From 110bdbe73bf8a0b805e7cf71b536ca54720f39d5 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 6 Aug 2020 11:48:07 +0800 Subject: [PATCH 09/11] Fix docs. #623 --- docs/content/user-guide/en/cap/messaging.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index 770d898..169549b 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -32,7 +32,7 @@ The consumer method is executed when the Consumer receives the message and will ## Data Cleanup -There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 hour** later. +There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later. Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later. From d029b6c30db679e1b83b839fca866f164094af3b Mon Sep 17 00:00:00 2001 From: mHalo Date: Thu, 6 Aug 2020 12:53:04 +0800 Subject: [PATCH 10/11] fixed #622 #624 (#625) --- src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs | 2 +- src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index 9cae360..ed5ee54 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -108,8 +108,8 @@ namespace DotNetCore.CAP.PostgreSql { Id = reader.GetInt64(index++), Version = reader.GetString(index++), - Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Name = reader.GetString(index++), + Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Content = reader.GetString(index++), Retries = reader.GetInt32(index++), Added = reader.GetDateTime(index++), diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs index f8e278c..eedf675 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs @@ -125,7 +125,7 @@ namespace DotNetCore.CAP.Internal // Ignore partial attributes when no topic attribute is defined on class. if (topicClassAttribute is null) { - topicMethodAttributes = topicMethodAttributes.Where(x => x.IsPartial); + topicMethodAttributes = topicMethodAttributes.Where(x => !x.IsPartial); } if (!topicMethodAttributes.Any()) From 079191b14bcd3a83881e192486de2a73ccd5f2af Mon Sep 17 00:00:00 2001 From: cBear Date: Fri, 7 Aug 2020 17:45:35 +0800 Subject: [PATCH 11/11] fix #624 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 接收列表中的分组和名称展示到了相反的位置 * 接收列表中的分组和名称展示到了相反的位置 --- src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs | 4 ++-- src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs index f4f68d0..106141f 100644 --- a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs @@ -124,8 +124,8 @@ SELECT { Id = reader.GetInt64(index++), Version = reader.GetString(index++), - Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Name = reader.GetString(index++), + Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Content = reader.GetString(index++), Retries = reader.GetInt32(index++), Added = reader.GetDateTime(index++), @@ -269,4 +269,4 @@ WHERE `Key` >= @minKey return mediumMessage; } } -} \ No newline at end of file +} diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index 0bbfc58..54dcacb 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -119,8 +119,8 @@ SELECT { Id = reader.GetInt64(index++), Version = reader.GetString(index++), - Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Name = reader.GetString(index++), + Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default, Content = reader.GetString(index++), Retries = reader.GetInt32(index++), Added = reader.GetDateTime(index++), @@ -271,4 +271,4 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < return await Task.FromResult(mediumMessage); } } -} \ No newline at end of file +}