Bladeren bron

Merge branch 'master' into supports/pulsar

master
Savorboard 4 jaren geleden
bovenliggende
commit
3b153308f6
27 gewijzigde bestanden met toevoegingen van 214 en 88 verwijderingen
  1. +12
    -19
      CAP.sln
  2. +17
    -2
      README.md
  3. +1
    -1
      README.zh-cn.md
  4. +1
    -1
      docs/content/user-guide/en/cap/messaging.md
  5. +4
    -4
      samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs
  6. +1
    -1
      samples/Sample.AmazonSQS.InMemory/Program.cs
  7. +1
    -2
      samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj
  8. +4
    -3
      samples/Sample.AmazonSQS.InMemory/Startup.cs
  9. +1
    -1
      samples/Sample.AmazonSQS.InMemory/appsettings.json
  10. +48
    -2
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
  11. +1
    -1
      src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs
  12. +1
    -1
      src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml
  13. +1
    -1
      src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs
  14. +3
    -4
      src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
  15. +5
    -2
      src/DotNetCore.CAP.MySql/IDbConnection.Extensions.cs
  16. +2
    -2
      src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
  17. +5
    -2
      src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs
  18. +1
    -1
      src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
  19. +5
    -2
      src/DotNetCore.CAP.SqlServer/IDbConnection.Extensions.cs
  20. +2
    -2
      src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs
  21. +7
    -7
      src/DotNetCore.CAP/CAP.Attribute.cs
  22. +26
    -0
      src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs
  23. +7
    -1
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  24. +21
    -19
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
  25. +9
    -1
      src/DotNetCore.CAP/Internal/TopicAttribute.cs
  26. +5
    -1
      src/DotNetCore.CAP/Transport/MqLogType.cs
  27. +23
    -5
      test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs

+ 12
- 19
CAP.sln Bestand weergeven

@@ -51,8 +51,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureService
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Dashboard", "src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj", "{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.InMemory", "samples\Sample.Kafka.InMemory\Sample.Kafka.InMemory.csproj", "{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer", "samples\Sample.RabbitMQ.SqlServer\Sample.RabbitMQ.SqlServer.csproj", "{F6C5C676-AF05-46D5-A45D-442137B31898}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.PostgreSql", "samples\Sample.Kafka.PostgreSql\Sample.Kafka.PostgreSql.csproj", "{F1EF1D26-8A6B-403E-85B0-250DF44A4A7C}"
@@ -67,9 +65,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "sample
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj", "{73408EA6-1025-463C-88BC-A20769E44BC4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory", "samples\Sample.AmazonSQS.InMemory\Sample.AmazonSQS.InMemory.csproj", "{B187DD15-092D-4B72-9807-50856607D237}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj", "{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -125,10 +123,6 @@ Global
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C}.Release|Any CPU.Build.0 = Release|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D}.Release|Any CPU.Build.0 = Release|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F6C5C676-AF05-46D5-A45D-442137B31898}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -157,14 +151,14 @@ Global
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU
{73408EA6-1025-463C-88BC-A20769E44BC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{73408EA6-1025-463C-88BC-A20769E44BC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{73408EA6-1025-463C-88BC-A20769E44BC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{73408EA6-1025-463C-88BC-A20769E44BC4}.Release|Any CPU.Build.0 = Release|Any CPU
{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}.Release|Any CPU.Build.0 = Release|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B187DD15-092D-4B72-9807-50856607D237}.Release|Any CPU.Build.0 = Release|Any CPU
{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -182,7 +176,6 @@ Global
{4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{56FB261C-67AF-4715-9A46-4FA4FAB91B2C} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{1B0371D6-36A4-4C78-A727-8ED732FDBA1D} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F6C5C676-AF05-46D5-A45D-442137B31898} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F1EF1D26-8A6B-403E-85B0-250DF44A4A7C} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{F8EF381A-FE83-40B3-A63D-09D83851B0FB} = {10C0818D-9160-4B80-BB86-DDE925B64D43}
@@ -190,8 +183,8 @@ Global
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{73408EA6-1025-463C-88BC-A20769E44BC4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{33C48DD1-5B7D-475B-B849-FFE1D9A4FBD1} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 17
- 2
README.md Bestand weergeven

@@ -2,7 +2,7 @@
<img height="140" src="https://cap.dotnetcore.xyz/img/logo.svg">
</p>

# CAP                       [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md)
# CAP                     [中文](https://github.com/dotnetcore/CAP/blob/master/README.zh-cn.md)
[![Travis branch](https://img.shields.io/travis/dotnetcore/CAP/master.svg?label=travis-ci)](https://travis-ci.org/dotnetcore/CAP)
[![AppVeyor](https://ci.appveyor.com/api/projects/status/v8gfh6pe2u2laqoa/branch/master?svg=true)](https://ci.appveyor.com/project/yang-xiaodong/cap/branch/master)
[![NuGet](https://img.shields.io/nuget/v/DotNetCore.CAP.svg)](https://www.nuget.org/packages/DotNetCore.CAP/)
@@ -159,7 +159,7 @@ namespace BusinessCode.Service
{
public interface ISubscriberService
{
public void CheckReceivedMessage(DateTime datetime);
void CheckReceivedMessage(DateTime datetime);
}

public class SubscriberService: ISubscriberService, ICapSubscribe
@@ -187,6 +187,21 @@ public void ConfigureServices(IServiceCollection services)
});
}
```
#### Use partials for topic subscriptions

To group topic subscriptions on class level you're able to define a subscription on a method as a partial. Subscriptions on the message queue will then be a combination of the topic defined on the class and the topic defined on the method. In the following example the `Create(..)` function will be invoked when receiving a message on `customers.create`

```c#
[CapSubscribe("customers")]
public class CustomersSubscriberService : ICapSubscribe
{
[CapSubscribe("create", isPartial: true)]
public void Create(Customer customer)
{
}
}
```


#### Subscribe Group



+ 1
- 1
README.zh-cn.md Bestand weergeven

@@ -174,7 +174,7 @@ namespace xxx.Service
{
public interface ISubscriberService
{
public void CheckReceivedMessage(DateTime datetime);
void CheckReceivedMessage(DateTime datetime);
}

public class SubscriberService: ISubscriberService, ICapSubscribe


+ 1
- 1
docs/content/user-guide/en/cap/messaging.md Bestand weergeven

@@ -32,7 +32,7 @@ The consumer method is executed when the Consumer receives the message and will

## Data Cleanup

There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 hour** later.
There is an `ExpiresAt` field in the database message table indicating the expiration time of the message. When the message is sent successfully, status will be changed to `Successed`, and `ExpiresAt` will be set to **1 day** later.

Consuming failure will change the message status to `Failed` and `ExpiresAt` will be set to **15 days** later.



samples/Sample.Kafka.InMemory/Controllers/ValuesController.cs → samples/Sample.AmazonSQS.InMemory/Controllers/ValuesController.cs Bestand weergeven

@@ -3,7 +3,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Kafka.InMemory.Controllers
namespace Sample.AmazonSQS.InMemory.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
@@ -18,13 +18,13 @@ namespace Sample.Kafka.InMemory.Controllers
[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
await _capBus.PublishAsync("sample.azure.mysql2", DateTime.Now);
await _capBus.PublishAsync("sample.aws.in-memory", DateTime.Now);

return Ok();
}

[CapSubscribe("sample.azure.mysql2")]
public void Test2T2(DateTime value)
[CapSubscribe("sample.aws.in-memory")]
public void SubscribeInMemoryTopic(DateTime value)
{
Console.WriteLine("Subscriber output message: " + value);
}

samples/Sample.Kafka.InMemory/Program.cs → samples/Sample.AmazonSQS.InMemory/Program.cs Bestand weergeven

@@ -1,7 +1,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Sample.Kafka.InMemory
namespace Sample.AmazonSQS.InMemory
{
public class Program
{

samples/Sample.Kafka.InMemory/Sample.Kafka.InMemory.csproj → samples/Sample.AmazonSQS.InMemory/Sample.AmazonSQS.InMemory.csproj Bestand weergeven

@@ -5,10 +5,9 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Kafka\DotNetCore.CAP.Kafka.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>


samples/Sample.Kafka.InMemory/Startup.cs → samples/Sample.AmazonSQS.InMemory/Startup.cs Bestand weergeven

@@ -1,7 +1,8 @@
using Microsoft.AspNetCore.Builder;
using Amazon;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

namespace Sample.Kafka.InMemory
namespace Sample.AmazonSQS.InMemory
{
public class Startup
{
@@ -10,7 +11,7 @@ namespace Sample.Kafka.InMemory
services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseKafka("localhost:9092");
x.UseAmazonSQS(RegionEndpoint.CNNorthWest1);
x.UseDashboard();
});


samples/Sample.Kafka.InMemory/appsettings.json → samples/Sample.AmazonSQS.InMemory/appsettings.json Bestand weergeven

@@ -2,7 +2,7 @@
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
"Default": "Error"
}
}
}

+ 48
- 2
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs Bestand weergeven

@@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
@@ -103,12 +104,27 @@ namespace DotNetCore.CAP.AmazonSQS

public void Commit(object sender)
{
_sqsClient.DeleteMessageAsync(_queueUrl, (string)sender);
try
{
_sqsClient.DeleteMessageAsync(_queueUrl, (string)sender);
}
catch (InvalidIdFormatException ex)
{
InvalidIdFormatLog(ex.Message);
}
}

public void Reject(object sender)
{
_sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000);
try
{
// Visible again in 3 seconds
_sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3);
}
catch (MessageNotInflightException ex)
{
MessageNotInflightLog(ex.Message);
}
}

public void Dispose()
@@ -162,5 +178,35 @@ namespace DotNetCore.CAP.AmazonSQS
}
}
}

#region private methods

private Task InvalidIdFormatLog(string exceptionMessage)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.InvalidIdFormat,
Reason = exceptionMessage
};

OnLog?.Invoke(null, logArgs);

return Task.CompletedTask;
}

private Task MessageNotInflightLog(string exceptionMessage)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.MessageNotInflight,
Reason = exceptionMessage
};

OnLog?.Invoke(null, logArgs);

return Task.CompletedTask;
}

#endregion
}
}

+ 1
- 1
src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs Bestand weergeven

@@ -2,7 +2,7 @@

namespace DotNetCore.CAP.AmazonSQS
{
public static class TopicNormalizer
internal static class TopicNormalizer
{
public static string NormalizeForAws(this string origin)
{


+ 1
- 1
src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.cshtml Bestand weergeven

@@ -45,7 +45,7 @@
{
<td rowspan="@rowCount">@subscriber.Key</td>
}
<td>@column.Attribute.Name</td>
<td>@column.TopicName</td>
<td>
<span style="color: #00bcd4">@column.ImplTypeInfo.Name</span>:
<div class="job-snippet-code">


+ 1
- 1
src/DotNetCore.CAP.Dashboard/Pages/SubscriberPage.generated.cs Bestand weergeven

@@ -200,7 +200,7 @@ WriteLiteral(" <td>");

#line 48 "..\..\Pages\SubscriberPage.cshtml"
Write(column.Attribute.Name);
Write(column.TopicName);

#line default


+ 3
- 4
src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs Bestand weergeven

@@ -27,8 +27,7 @@ namespace DotNetCore.CAP.MongoDB
public MongoDBDataStorage(
IOptions<CapOptions> capOptions,
IOptions<MongoDBOptions> options,
IMongoClient client,
ILogger<MongoDBDataStorage> logger)
IMongoClient client)
{
_capOptions = capOptions;
_options = options;
@@ -194,7 +193,7 @@ namespace DotNetCore.CAP.MongoDB
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.PublishedCollection);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < fourMinAgo
@@ -217,4 +216,4 @@ namespace DotNetCore.CAP.MongoDB
return new MongoDBMonitoringApi(_client, _options);
}
}
}
}

src/DotNetCore.CAP.MySql/IDbConnectionExtensions.cs → src/DotNetCore.CAP.MySql/IDbConnection.Extensions.cs Bestand weergeven

@@ -1,10 +1,13 @@
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.MySql
{
internal static class IDbConnectionExtensions
internal static class DbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)

+ 2
- 2
src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs Bestand weergeven

@@ -124,8 +124,8 @@ SELECT
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
@@ -269,4 +269,4 @@ WHERE `Key` >= @minKey
return mediumMessage;
}
}
}
}

src/DotNetCore.CAP.PostgreSql/IDbConnectionExtensions.cs → src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs Bestand weergeven

@@ -1,10 +1,13 @@
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.PostgreSql
{
internal static class IDbConnectionExtensions
internal static class DbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)

+ 1
- 1
src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs Bestand weergeven

@@ -108,8 +108,8 @@ namespace DotNetCore.CAP.PostgreSql
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),


src/DotNetCore.CAP.SqlServer/IDbConnectionExtensions.cs → src/DotNetCore.CAP.SqlServer/IDbConnection.Extensions.cs Bestand weergeven

@@ -1,10 +1,13 @@
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.ComponentModel;
using System.Data;

namespace DotNetCore.CAP.SqlServer
{
internal static class IDbConnectionExtensions
internal static class DbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
params object[] sqlParams)

+ 2
- 2
src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs Bestand weergeven

@@ -119,8 +119,8 @@ SELECT
{
Id = reader.GetInt64(index++),
Version = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Name = reader.GetString(index++),
Group = queryDto.MessageType == MessageType.Subscribe ? reader.GetString(index++) : default,
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
@@ -271,4 +271,4 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <
return await Task.FromResult(mediumMessage);
}
}
}
}

+ 7
- 7
src/DotNetCore.CAP/CAP.Attribute.cs Bestand weergeven

@@ -10,13 +10,13 @@ using DotNetCore.CAP.Internal;
namespace DotNetCore.CAP
{
public class CapSubscribeAttribute : TopicAttribute
{
public CapSubscribeAttribute(string name)
: base(name)
{
}
{
public CapSubscribeAttribute(string name, bool isPartial = false)
: base(name, isPartial)
{
}
public override string ToString()
{
return Name;


+ 26
- 0
src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs Bestand weergeven

@@ -20,7 +20,33 @@ namespace DotNetCore.CAP.Internal

public TopicAttribute Attribute { get; set; }

public TopicAttribute ClassAttribute { get; set; }

public IList<ParameterDescriptor> Parameters { get; set; }

private string _topicName;
/// <summary>
/// Topic name based on both <see cref="Attribute"/> and <see cref="ClassAttribute"/>.
/// </summary>
public string TopicName
{
get
{
if (_topicName == null)
{
if (ClassAttribute != null && Attribute.IsPartial)
{
// Allows class level attribute name to end with a '.' and allows methods level attribute to start with a '.'.
_topicName = $"{ClassAttribute.Name.TrimEnd('.')}.{Attribute.Name.TrimStart('.')}";
}
else
{
_topicName = Attribute.Name;
}
}
return _topicName;
}
}
}

public class ParameterDescriptor


+ 7
- 1
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs Bestand weergeven

@@ -79,7 +79,7 @@ namespace DotNetCore.CAP.Internal

RegisterMessageProcessor(client);

client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
client.Subscribe(matchGroup.Value.Select(x => x.TopicName));

client.Listening(_pollingDelay, _cts.Token);
}
@@ -271,6 +271,12 @@ namespace DotNetCore.CAP.Internal
case MqLogType.ExceptionReceived:
_logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason);
break;
case MqLogType.InvalidIdFormat:
_logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason);
break;
case MqLogType.MessageNotInflight:
_logger.LogError("AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason);
break;
default:
throw new ArgumentOutOfRangeException();
}


+ 21
- 19
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs Bestand weergeven

@@ -116,17 +116,24 @@ namespace DotNetCore.CAP.Internal

protected IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null)
{
var topicClassAttribute = typeInfo.GetCustomAttribute<TopicAttribute>(true);

foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttributes<TopicAttribute>(true);
var topicAttributes = topicAttr as IList<TopicAttribute> ?? topicAttr.ToList();
var topicMethodAttributes = method.GetCustomAttributes<TopicAttribute>(true);

// Ignore partial attributes when no topic attribute is defined on class.
if (topicClassAttribute is null)
{
topicMethodAttributes = topicMethodAttributes.Where(x => !x.IsPartial);
}

if (!topicAttributes.Any())
if (!topicMethodAttributes.Any())
{
continue;
}

foreach (var attr in topicAttributes)
foreach (var attr in topicMethodAttributes)
{
SetSubscribeAttribute(attr);

@@ -138,21 +145,14 @@ namespace DotNetCore.CAP.Internal
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
}).ToList();

yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters);
yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters, topicClassAttribute);
}
}
}

protected virtual void SetSubscribeAttribute(TopicAttribute attribute)
{
if (attribute.Group == null)
{
attribute.Group = _capOptions.DefaultGroup + "." + _capOptions.Version;
}
else
{
attribute.Group = attribute.Group + "." + _capOptions.Version;
}
attribute.Group = (attribute.Group ?? _capOptions.DefaultGroup) + "." + _capOptions.Version;
}

private static ConsumerExecutorDescriptor InitDescriptor(
@@ -160,11 +160,13 @@ namespace DotNetCore.CAP.Internal
MethodInfo methodInfo,
TypeInfo implType,
TypeInfo serviceTypeInfo,
IList<ParameterDescriptor> parameters)
IList<ParameterDescriptor> parameters,
TopicAttribute classAttr = null)
{
var descriptor = new ConsumerExecutorDescriptor
{
Attribute = attr,
ClassAttribute = classAttr,
MethodInfo = methodInfo,
ImplTypeInfo = implType,
ServiceTypeInfo = serviceTypeInfo,
@@ -176,7 +178,7 @@ namespace DotNetCore.CAP.Internal

private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name.Equals(key, StringComparison.InvariantCultureIgnoreCase));
return executeDescriptor.FirstOrDefault(x => x.TopicName.Equals(key, StringComparison.InvariantCultureIgnoreCase));
}

private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
@@ -184,10 +186,10 @@ namespace DotNetCore.CAP.Internal
var group = executeDescriptor.First().Attribute.Group;
if (!_asteriskList.TryGetValue(group, out var tmpList))
{
tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
tmpList = executeDescriptor.Where(x => x.TopicName.IndexOf('*') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{
Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."),
Name = ("^" + x.TopicName + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."),
Descriptor = x
}).ToList();
_asteriskList.TryAdd(group, tmpList);
@@ -210,10 +212,10 @@ namespace DotNetCore.CAP.Internal
if (!_poundList.TryGetValue(group, out var tmpList))
{
tmpList = executeDescriptor
.Where(x => x.Attribute.Name.IndexOf('#') >= 0)
.Where(x => x.TopicName.IndexOf('#') >= 0)
.Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
{
Name = ("^" + x.Attribute.Name.Replace(".", "\\.") + "$").Replace("#", "[0-9_a-zA-Z\\.]+"),
Name = ("^" + x.TopicName.Replace(".", "\\.") + "$").Replace("#", "[0-9_a-zA-Z\\.]+"),
Descriptor = x
}).ToList();
_poundList.TryAdd(group, tmpList);


+ 9
- 1
src/DotNetCore.CAP/Internal/TopicAttribute.cs Bestand weergeven

@@ -12,9 +12,10 @@ namespace DotNetCore.CAP.Internal
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Class, AllowMultiple = true)]
public abstract class TopicAttribute : Attribute
{
protected TopicAttribute(string name)
protected TopicAttribute(string name, bool isPartial = false)
{
Name = name;
IsPartial = isPartial;
}

/// <summary>
@@ -22,6 +23,13 @@ namespace DotNetCore.CAP.Internal
/// </summary>
public string Name { get; }

/// <summary>
/// Defines wether this attribute defines a topic subscription partial.
/// The defined topic will be combined with a topic subscription defined on class level,
/// which results for example in subscription on "class.method".
/// </summary>
public bool IsPartial { get; }

/// <summary>
/// Default group name is CapOptions setting.(Assembly name)
/// kafka --> groups.id


+ 5
- 1
src/DotNetCore.CAP/Transport/MqLogType.cs Bestand weergeven

@@ -18,7 +18,11 @@ namespace DotNetCore.CAP.Transport
ServerConnError,

//AzureServiceBus
ExceptionReceived
ExceptionReceived,

//Amazon SQS
InvalidIdFormat,
MessageNotInflight
}

public class LogMessageEventArgs : EventArgs


+ 23
- 5
test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs Bestand weergeven

@@ -29,15 +29,18 @@ namespace DotNetCore.CAP.Test
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();

Assert.Equal(6, candidates.Count);
Assert.Equal(8, candidates.Count);
}

[Fact]
public void CanFindSpecifiedTopic()
[Theory]
[InlineData("Candidates.Foo")]
[InlineData("Candidates.Foo3")]
[InlineData("Candidates.Foo4")]
public void CanFindSpecifiedTopic(string topic)
{
var selector = _provider.GetRequiredService<IConsumerServiceSelector>();
var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate("Candidates.Foo", candidates);
var bestCandidates = selector.SelectBestCandidate(topic, candidates);

Assert.NotNull(bestCandidates);
Assert.NotNull(bestCandidates.MethodInfo);
@@ -116,7 +119,7 @@ namespace DotNetCore.CAP.Test

public class CandidatesTopic : TopicAttribute
{
public CandidatesTopic(string topicName) : base(topicName)
public CandidatesTopic(string topicName, bool isPartial = false) : base(topicName, isPartial)
{
}
}
@@ -129,6 +132,7 @@ namespace DotNetCore.CAP.Test
{
}

[CandidatesTopic("Candidates")]
public class CandidatesFooTest : IFooTest, ICapSubscribe
{
[CandidatesTopic("Candidates.Foo")]
@@ -144,6 +148,20 @@ namespace DotNetCore.CAP.Test
Console.WriteLine("GetFoo2() method has bee excuted.");
}

[CandidatesTopic("Foo3", isPartial: true)]
public Task GetFoo3()
{
Console.WriteLine("GetFoo3() method has bee excuted.");
return Task.CompletedTask;
}

[CandidatesTopic(".Foo4", isPartial: true)]
public Task GetFoo4()
{
Console.WriteLine("GetFoo4() method has bee excuted.");
return Task.CompletedTask;
}

[CandidatesTopic("*.*.Asterisk")]
[CandidatesTopic("*.Asterisk")]
public void GetFooAsterisk()


Laden…
Annuleren
Opslaan