diff --git a/.travis.yml b/.travis.yml index e319bee..1a12425 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,5 +12,5 @@ services: script: - export PATH="$PATH:$HOME/.dotnet/tools" - - dotnet tool install --global FlubuCore.GlobalTool --version 4.3.7 + - dotnet tool install --global FlubuCore.GlobalTool --version 5.1.1 - flubu build tests \ No newline at end of file diff --git a/README.md b/README.md index f72f435..78c23d9 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,7 @@ services.AddCap(x => CAP v2.1+ provides the dashboard pages, you can easily view the sent and received messages. In addition, you can also view the message status in real time on the dashboard. Use the following command to install the Dashboard in your project. ``` -PM> Install-Package DotNetCore.Dashboard +PM> Install-Package DotNetCore.CAP.Dashboard ``` In the distributed environment, the dashboard built-in integrated [Consul](http://consul.io) as a node discovery, while the realization of the gateway agent function, you can also easily view the node or other node data, It's like you are visiting local resources. diff --git a/README.zh-cn.md b/README.zh-cn.md index f276310..f0e771f 100644 --- a/README.zh-cn.md +++ b/README.zh-cn.md @@ -242,7 +242,7 @@ CAP 2.1+ 以上版本中提供了仪表盘(Dashboard)功能,你可以很 使用一下命令安装 Dashboard: ``` -PM> Install-Package DotNetCore.Dashboard +PM> Install-Package DotNetCore.CAP.Dashboard ``` 在分布式环境中,仪表盘内置集成了 [Consul](http://consul.io) 作为节点的注册发现,同时实现了网关代理功能,你同样可以方便的查看本节点或者其他节点的数据,它就像你访问本地资源一样。 diff --git a/appveyor.yml b/appveyor.yml index 4df4474..2e8fe7e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,7 +7,7 @@ environment: services: - mysql before_build: - - ps: dotnet tool install --global FlubuCore.GlobalTool --version 4.3.7 + - ps: dotnet tool install --global FlubuCore.GlobalTool --version 5.1.1 build_script: - ps: flubu test: off diff --git a/build/BuildScript.cs b/build/BuildScript.cs index be7f0fa..54f897e 100644 --- a/build/BuildScript.cs +++ b/build/BuildScript.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using FlubuCore.Context; +using FlubuCore.Context.Attributes.BuildProperties; using FlubuCore.IO; using FlubuCore.Scripting; using FlubuCore.Scripting.Attributes; @@ -9,23 +10,20 @@ namespace BuildScript [Include("./build/BuildVersion.cs")] public partial class BuildScript : DefaultBuildScript { - protected string ArtifactsDir => RootDirectory.CombineWith("artifacts"); - [FromArg("c|configuration")] - public string Configuration { get; set; } + [BuildConfiguration] + public string Configuration { get; set; } = "Release"; + + [SolutionFileName] public string SolutionFileName { get; set; } = "CAP.sln"; protected BuildVersion BuildVersion { get; set; } + protected string ArtifactsDir => RootDirectory.CombineWith("artifacts"); + protected List ProjectFiles { get; set; } protected List TestProjectFiles { get; set; } - protected override void ConfigureBuildProperties(IBuildPropertiesContext context) - { - context.Properties.Set(BuildProps.SolutionFileName, "CAP.sln"); - context.Properties.Set(BuildProps.BuildConfiguration, string.IsNullOrEmpty(Configuration) ? "Release" : Configuration); - } - protected override void BeforeBuildExecution(ITaskContext context) { BuildVersion = FetchBuildVersion(context); diff --git a/build/BuildScript.csproj b/build/BuildScript.csproj index 44dc6cc..063bc46 100644 --- a/build/BuildScript.csproj +++ b/build/BuildScript.csproj @@ -9,7 +9,7 @@ - + diff --git a/docs/content/user-guide/zh/transports/azure-service-bus.md b/docs/content/user-guide/zh/transports/azure-service-bus.md index a44e354..ea12605 100644 --- a/docs/content/user-guide/zh/transports/azure-service-bus.md +++ b/docs/content/user-guide/zh/transports/azure-service-bus.md @@ -39,10 +39,10 @@ public void ConfigureServices(IServiceCollection services) #### AzureServiceBus Options -CAP 直接对外提供的 Kafka 配置参数如下: +CAP 直接对外提供的 Azure Service Bus 配置参数如下: NAME | DESCRIPTION | TYPE | DEFAULT :---|:---|---|:--- ConnectionString | Endpoint 地址 | string | TopicPath | Topic entity path | string | cap -ManagementTokenProvider | Token提供 | ITokenProvider | null \ No newline at end of file +ManagementTokenProvider | Token提供 | ITokenProvider | null diff --git a/samples/Sample.Kafka.InMemory/Startup.cs b/samples/Sample.Kafka.InMemory/Startup.cs index 281ef24..ae3ee2a 100644 --- a/samples/Sample.Kafka.InMemory/Startup.cs +++ b/samples/Sample.Kafka.InMemory/Startup.cs @@ -10,7 +10,7 @@ namespace Sample.Kafka.InMemory services.AddCap(x => { x.UseInMemoryStorage(); - x.UseKafka("192.168.2.120:9093"); + x.UseKafka("localhost:9092"); x.UseDashboard(); }); diff --git a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs index e5b22e5..0481e10 100644 --- a/samples/Sample.RabbitMQ.MySql/AppDbContext.cs +++ b/samples/Sample.RabbitMQ.MySql/AppDbContext.cs @@ -26,7 +26,7 @@ namespace Sample.RabbitMQ.MySql } public class AppDbContext : DbContext { - public const string ConnectionString = "Server=192.168.2.120;Database=captest;UserId=root;Password=123123;"; + public const string ConnectionString = "Server=localhost;Database=testcap;UserId=root;Password=123123;"; public DbSet Persons { get; set; } diff --git a/samples/Sample.RabbitMQ.MySql/Startup.cs b/samples/Sample.RabbitMQ.MySql/Startup.cs index 3b76a4b..6b31ef5 100644 --- a/samples/Sample.RabbitMQ.MySql/Startup.cs +++ b/samples/Sample.RabbitMQ.MySql/Startup.cs @@ -1,7 +1,7 @@ -using System; -using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Sample.RabbitMQ.MySql { @@ -14,13 +14,14 @@ namespace Sample.RabbitMQ.MySql services.AddCap(x => { x.UseEntityFramework(); - x.UseRabbitMQ("192.168.2.120"); + x.UseRabbitMQ("localhost"); x.UseDashboard(); x.FailedRetryCount = 5; - x.FailedThresholdCallback = (type, msg) => + x.FailedThresholdCallback = failed => { - Console.WriteLine( - $@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); + var logger = failed.ServiceProvider.GetService>(); + logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, + requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); }; }); diff --git a/samples/Sample.RabbitMQ.SqlServer/Startup.cs b/samples/Sample.RabbitMQ.SqlServer/Startup.cs index 38b91f7..a309637 100644 --- a/samples/Sample.RabbitMQ.SqlServer/Startup.cs +++ b/samples/Sample.RabbitMQ.SqlServer/Startup.cs @@ -2,6 +2,7 @@ using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; namespace Sample.RabbitMQ.SqlServer { @@ -17,10 +18,11 @@ namespace Sample.RabbitMQ.SqlServer x.UseRabbitMQ("192.168.2.120"); x.UseDashboard(); x.FailedRetryCount = 5; - x.FailedThresholdCallback = (type, msg) => + x.FailedThresholdCallback = failed => { - Console.WriteLine( - $@"A message of type {type} failed after executing {x.FailedRetryCount} several times, requiring manual troubleshooting. Message name: {msg.GetName()}"); + var logger = failed.ServiceProvider.GetService>(); + logger.LogError($@"A message of type {failed.MessageType} failed after executing {x.FailedRetryCount} several times, + requiring manual troubleshooting. Message name: {failed.Message.GetName()}"); }; }); diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs index e4de064..3af3d95 100644 --- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs +++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs @@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Kafka { internal sealed class KafkaConsumerClient : IConsumerClient { - private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); private readonly string _groupId; private readonly KafkaOptions _kafkaOptions; @@ -100,7 +100,7 @@ namespace DotNetCore.CAP.Kafka return; } - _connectionLock.Wait(); + ConnectionLock.Wait(); try { @@ -117,7 +117,7 @@ namespace DotNetCore.CAP.Kafka } finally { - _connectionLock.Release(); + ConnectionLock.Release(); } } diff --git a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs index 39c321e..d43bdbd 100644 --- a/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs +++ b/src/DotNetCore.CAP.RabbitMQ/ITransport.RabbitMQ.cs @@ -32,9 +32,10 @@ namespace DotNetCore.CAP.RabbitMQ public Task SendAsync(TransportMessage message) { - var channel = _connectionChannelPool.Rent(); + IModel channel = null; try { + channel = _connectionChannelPool.Rent(); var props = new BasicProperties { DeliveryMode = 2, @@ -62,12 +63,15 @@ namespace DotNetCore.CAP.RabbitMQ } finally { - var returned = _connectionChannelPool.Return(channel); - if (!returned) + if (channel != null) { - channel.Dispose(); + var returned = _connectionChannelPool.Return(channel); + if (!returned) + { + channel.Dispose(); + } } } - } + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index ea178d1..ebdbf91 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -53,7 +53,7 @@ namespace DotNetCore.CAP /// /// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals times. /// - public Action FailedThresholdCallback { get; set; } + public Action FailedThresholdCallback { get; set; } /// /// The number of message retries, the retry will stop when the threshold is reached. diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 2d2f70a..7e32a68 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -12,6 +12,7 @@ using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -19,14 +20,16 @@ namespace DotNetCore.CAP.Internal { internal class ConsumerRegister : IConsumerRegister { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IConsumerClientFactory _consumerClientFactory; private readonly IDispatcher _dispatcher; private readonly ISerializer _serializer; private readonly IDataStorage _storage; - private readonly ILogger _logger; + private readonly MethodMatcherCache _selector; private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); private readonly CapOptions _options; - private readonly MethodMatcherCache _selector; private CancellationTokenSource _cts; private BrokerAddress _serverAddress; @@ -39,21 +42,17 @@ namespace DotNetCore.CAP.Internal private static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName); - public ConsumerRegister(ILogger logger, - IOptions options, - MethodMatcherCache selector, - IConsumerClientFactory consumerClientFactory, - IDispatcher dispatcher, - ISerializer serializer, - IDataStorage storage) + public ConsumerRegister(ILogger logger, IServiceProvider serviceProvider) { - _options = options.Value; - _selector = selector; _logger = logger; - _consumerClientFactory = consumerClientFactory; - _dispatcher = dispatcher; - _serializer = serializer; - _storage = storage; + _serviceProvider = serviceProvider; + + _options = serviceProvider.GetService>().Value; + _selector = serviceProvider.GetService(); + _consumerClientFactory = serviceProvider.GetService(); + _dispatcher = serviceProvider.GetService(); + _serializer = serviceProvider.GetService(); + _storage = serviceProvider.GetService(); _cts = new CancellationTokenSource(); } @@ -202,6 +201,22 @@ namespace DotNetCore.CAP.Internal client.Commit(sender); + try + { + _options.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _serviceProvider, + MessageType = MessageType.Subscribe, + Message = message + }); + + _logger.ConsumerExecutedAfterThreshold(message.GetId(), _options.FailedRetryCount); + } + catch (Exception e) + { + _logger.ExecutedThresholdCallbackFailed(e); + } + TracingAfter(tracingTimestamp, transportMessage, _serverAddress); } else diff --git a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs index d256b2d..39c619c 100644 --- a/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs +++ b/src/DotNetCore.CAP/Internal/IMessageSender.Default.cs @@ -9,6 +9,7 @@ using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; using DotNetCore.CAP.Serialization; using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,10 +17,12 @@ namespace DotNetCore.CAP.Internal { internal class MessageSender : IMessageSender { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IDataStorage _dataStorage; private readonly ISerializer _serializer; private readonly ITransport _transport; - private readonly ILogger _logger; private readonly IOptions _options; // ReSharper disable once InconsistentNaming @@ -28,16 +31,15 @@ namespace DotNetCore.CAP.Internal public MessageSender( ILogger logger, - IOptions options, - IDataStorage dataStorage, - ISerializer serializer, - ITransport transport) + IServiceProvider serviceProvider) { - _options = options; - _dataStorage = dataStorage; - _serializer = serializer; - _transport = transport; _logger = logger; + _serviceProvider = serviceProvider; + + _options = serviceProvider.GetService>(); + _dataStorage = serviceProvider.GetService(); + _serializer = serviceProvider.GetService(); + _transport = serviceProvider.GetService(); } public async Task SendAsync(MediumMessage message) @@ -111,7 +113,12 @@ namespace DotNetCore.CAP.Internal { try { - _options.Value.FailedThresholdCallback?.Invoke(MessageType.Publish, message.Origin); + _options.Value.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _serviceProvider, + MessageType = MessageType.Publish, + Message = message.Origin + }); _logger.SenderAfterThreshold(message.DbId, _options.Value.FailedRetryCount); } diff --git a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index a538a7e..1d1b9b3 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -145,7 +145,12 @@ namespace DotNetCore.CAP.Internal { try { - _options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Origin); + _options.FailedThresholdCallback?.Invoke(new FailedInfo + { + ServiceProvider = _provider, + MessageType = MessageType.Subscribe, + Message = message.Origin + }); _logger.ConsumerExecutedAfterThreshold(message.DbId, _options.FailedRetryCount); } diff --git a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs index ca16741..2fb19cb 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; +using System.ComponentModel; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -60,7 +61,8 @@ namespace DotNetCore.CAP.Internal } else { - executeParameters[i] = message.Value; + var converter = TypeDescriptor.GetConverter(parameterDescriptors[i].ParameterType); + executeParameters[i] = converter.ConvertFrom(message.Value); } } } diff --git a/src/DotNetCore.CAP/Messages/FailedInfo.cs b/src/DotNetCore.CAP/Messages/FailedInfo.cs new file mode 100644 index 0000000..501307e --- /dev/null +++ b/src/DotNetCore.CAP/Messages/FailedInfo.cs @@ -0,0 +1,13 @@ +using System; + +namespace DotNetCore.CAP.Messages +{ + public class FailedInfo + { + public IServiceProvider ServiceProvider { get; set; } + + public MessageType MessageType { get; set; } + + public Message Message { get; set; } + } +}