@@ -35,3 +35,7 @@ bin/ | |||
/.idea | |||
Properties | |||
/pack.bat | |||
/src/DotNetCore.CAP/project.json | |||
/src/DotNetCore.CAP/packages.config | |||
/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj | |||
/NuGet.config |
@@ -2,7 +2,7 @@ | |||
<PropertyGroup> | |||
<VersionMajor>2</VersionMajor> | |||
<VersionMinor>0</VersionMinor> | |||
<VersionPatch>0</VersionPatch> | |||
<VersionPatch>1</VersionPatch> | |||
<VersionQuality></VersionQuality> | |||
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix> | |||
</PropertyGroup> | |||
@@ -1,4 +1,5 @@ | |||
using System.IO; | |||
using Microsoft.AspNetCore; | |||
using Microsoft.AspNetCore.Builder; | |||
using Microsoft.AspNetCore.Hosting; | |||
using Microsoft.Extensions.Configuration; | |||
@@ -7,22 +8,30 @@ namespace Sample.RabbitMQ.SqlServer | |||
{ | |||
public class Program | |||
{ | |||
//var config = new ConfigurationBuilder() | |||
// .AddCommandLine(args) | |||
// .AddEnvironmentVariables("ASPNETCORE_") | |||
// .Build(); | |||
//var host = new WebHostBuilder() | |||
// .UseConfiguration(config) | |||
// .UseKestrel() | |||
// .UseContentRoot(Directory.GetCurrentDirectory()) | |||
// .UseIISIntegration() | |||
// .UseStartup<Startup>() | |||
// .Build(); | |||
//host.Run(); | |||
public static void Main(string[] args) | |||
{ | |||
var config = new ConfigurationBuilder() | |||
.AddCommandLine(args) | |||
.AddEnvironmentVariables("ASPNETCORE_") | |||
.Build(); | |||
BuildWebHost(args).Run(); | |||
} | |||
var host = new WebHostBuilder() | |||
.UseConfiguration(config) | |||
.UseKestrel() | |||
.UseContentRoot(Directory.GetCurrentDirectory()) | |||
.UseIISIntegration() | |||
public static IWebHost BuildWebHost(string[] args) => | |||
WebHost.CreateDefaultBuilder(args) | |||
.UseStartup<Startup>() | |||
.Build(); | |||
host.Run(); | |||
} | |||
} | |||
} |
@@ -21,7 +21,7 @@ namespace DotNetCore.CAP | |||
services.AddSingleton(kafkaOptions); | |||
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); | |||
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | |||
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); | |||
} | |||
} | |||
} |
@@ -23,9 +23,8 @@ namespace DotNetCore.CAP | |||
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); | |||
services.AddSingleton<ConnectionPool>(); | |||
services.AddScoped(x => x.GetService<ConnectionPool>().Rent()); | |||
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | |||
services.AddSingleton<IQueueExecutor, PublishQueueExecutor>(); | |||
} | |||
} | |||
} |
@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
{ | |||
public class ConnectionPool : IConnectionPool, IDisposable | |||
{ | |||
private const int DefaultPoolSize = 32; | |||
private const int DefaultPoolSize = 15; | |||
private readonly ConcurrentQueue<IConnection> _pool = new ConcurrentQueue<IConnection>(); | |||
@@ -10,27 +10,29 @@ namespace DotNetCore.CAP.RabbitMQ | |||
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly IConnection _connection; | |||
private readonly ConnectionPool _connectionPool; | |||
private readonly RabbitMQOptions _rabbitMQOptions; | |||
public PublishQueueExecutor( | |||
CapOptions options, | |||
IStateChanger stateChanger, | |||
IConnection connection, | |||
ConnectionPool connectionPool, | |||
RabbitMQOptions rabbitMQOptions, | |||
ILogger<PublishQueueExecutor> logger) | |||
: base(options, stateChanger, logger) | |||
{ | |||
_logger = logger; | |||
_connection = connection; | |||
_connectionPool = connectionPool; | |||
_rabbitMQOptions = rabbitMQOptions; | |||
} | |||
public override Task<OperateResult> PublishAsync(string keyName, string content) | |||
{ | |||
var connection = _connectionPool.Rent(); | |||
try | |||
{ | |||
using (var channel = _connection.CreateModel()) | |||
using (var channel = connection.CreateModel()) | |||
{ | |||
var body = Encoding.UTF8.GetBytes(content); | |||
@@ -55,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ | |||
Description = ex.Message | |||
})); | |||
} | |||
finally | |||
{ | |||
_connectionPool.Return(connection); | |||
} | |||
} | |||
} | |||
} |
@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private readonly string _queueName; | |||
private readonly RabbitMQOptions _rabbitMQOptions; | |||
private IConnection _connection; | |||
private ConnectionPool _connectionPool; | |||
private IModel _channel; | |||
private ulong _deliveryTag; | |||
@@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ | |||
public event EventHandler<string> OnError; | |||
public RabbitMQConsumerClient(string queueName, | |||
IConnection connection, | |||
ConnectionPool connectionPool, | |||
RabbitMQOptions options) | |||
{ | |||
_queueName = queueName; | |||
_connection = connection; | |||
_connectionPool = connectionPool; | |||
_rabbitMQOptions = options; | |||
_exchageName = options.TopicExchangeName; | |||
@@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private void InitClient() | |||
{ | |||
_channel = _connection.CreateModel(); | |||
var connection = _connectionPool.Rent(); | |||
_channel = connection.CreateModel(); | |||
_channel.ExchangeDeclare( | |||
exchange: _exchageName, | |||
@@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||
exclusive: false, | |||
autoDelete: false, | |||
arguments: arguments); | |||
_connectionPool.Return(connection); | |||
} | |||
public void Subscribe(IEnumerable<string> topics) | |||
@@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ | |||
public void Dispose() | |||
{ | |||
_channel.Dispose(); | |||
_connection.Dispose(); | |||
} | |||
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) | |||
@@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ | |||
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory | |||
{ | |||
private readonly RabbitMQOptions _rabbitMQOptions; | |||
private readonly IConnection _connection; | |||
private readonly ConnectionPool _connectionPool; | |||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection) | |||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool) | |||
{ | |||
_rabbitMQOptions = rabbitMQOptions; | |||
_connection = connection; | |||
_connectionPool = pool; | |||
} | |||
public IConsumerClient Create(string groupId) | |||
{ | |||
return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions); | |||
return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions); | |||
} | |||
} | |||
} |
@@ -19,11 +19,15 @@ namespace DotNetCore.CAP | |||
public void AddServices(IServiceCollection services) | |||
{ | |||
services.AddSingleton<IStorage, SqlServerStorage>(); | |||
services.AddScoped<IStorageConnection, SqlServerStorageConnection>(); | |||
services.AddScoped<ICapPublisher, CapPublisher>(); | |||
services.AddSingleton<IStorageConnection, SqlServerStorageConnection>(); | |||
services.AddTransient<ICapPublisher, CapPublisher>(); | |||
services.AddTransient<ICallbackPublisher, CapPublisher>(); | |||
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>(); | |||
AddSqlServerOptions(services); | |||
} | |||
private void AddSqlServerOptions(IServiceCollection services) | |||
{ | |||
var sqlServerOptions = new SqlServerOptions(); | |||
_configure(sqlServerOptions); | |||
@@ -32,9 +36,13 @@ namespace DotNetCore.CAP | |||
{ | |||
services.AddSingleton(x => | |||
{ | |||
var dbContext = (DbContext)x.GetService(sqlServerOptions.DbContextType); | |||
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | |||
return sqlServerOptions; | |||
using (var scope = x.CreateScope()) | |||
{ | |||
var provider = scope.ServiceProvider; | |||
var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType); | |||
sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | |||
return sqlServerOptions; | |||
} | |||
}); | |||
} | |||
else | |||
@@ -11,10 +11,9 @@ namespace DotNetCore.CAP.Abstractions | |||
/// <summary> | |||
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with | |||
/// <paramref name="provider"/>. | |||
/// </summary> | |||
/// <param name="provider"> <see cref="IServiceProvider"/>.</param> | |||
/// </summary> | |||
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns> | |||
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider); | |||
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(); | |||
/// <summary> | |||
/// Selects the best <see cref="ConsumerExecutorDescriptor"/> candidate from <paramref name="candidates"/> for the | |||
@@ -82,15 +82,6 @@ namespace Microsoft.Extensions.DependencyInjection | |||
{ | |||
services.AddTransient(service.Key, service.Value); | |||
} | |||
var types = Assembly.GetEntryAssembly().ExportedTypes; | |||
foreach (var type in types) | |||
{ | |||
if (Helper.IsController(type.GetTypeInfo())) | |||
{ | |||
services.AddTransient(typeof(object), type); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -25,14 +25,13 @@ namespace DotNetCore.CAP | |||
IOptions<CapOptions> options, | |||
IStorage storage, | |||
IApplicationLifetime appLifetime, | |||
IServiceProvider provider) | |||
IEnumerable<IProcessingServer> servers) | |||
{ | |||
_logger = logger; | |||
_appLifetime = appLifetime; | |||
Options = options.Value; | |||
Storage = storage; | |||
Provider = provider; | |||
Servers = Provider.GetServices<IProcessingServer>(); | |||
Servers = servers; | |||
_cts = new CancellationTokenSource(); | |||
_ctsRegistration = appLifetime.ApplicationStopping.Register(() => | |||
@@ -55,8 +54,6 @@ namespace DotNetCore.CAP | |||
protected IEnumerable<IProcessingServer> Servers { get; } | |||
public IServiceProvider Provider { get; private set; } | |||
public Task BootstrapAsync() | |||
{ | |||
return (_bootstrappingTask = BootstrapTaskAsync()); | |||
@@ -47,7 +47,7 @@ namespace DotNetCore.CAP | |||
public void Start() | |||
{ | |||
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider); | |||
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(); | |||
foreach (var matchGroup in groupingMatchs) | |||
{ | |||
@@ -1,6 +1,5 @@ | |||
using System; | |||
using DotNetCore.CAP.Abstractions; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.Internal | |||
@@ -23,15 +22,13 @@ namespace DotNetCore.CAP.Internal | |||
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext) | |||
{ | |||
using (var scope = _serviceProvider.CreateScope()) | |||
var context = new ConsumerInvokerContext(consumerContext) | |||
{ | |||
var context = new ConsumerInvokerContext(consumerContext) | |||
{ | |||
Result = new DefaultConsumerInvoker(_logger, scope.ServiceProvider, _modelBinderFactory, consumerContext) | |||
}; | |||
Result = new DefaultConsumerInvoker(_logger, _serviceProvider, | |||
_modelBinderFactory, consumerContext) | |||
}; | |||
return context.Result; | |||
} | |||
return context.Result; | |||
} | |||
} | |||
} |
@@ -35,25 +35,29 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name); | |||
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, | |||
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); | |||
using (var scope = _serviceProvider.CreateScope()) | |||
{ | |||
var provider = scope.ServiceProvider; | |||
var serviceType = _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType(); | |||
var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType); | |||
var jsonConent = _consumerContext.DeliverMessage.Content; | |||
var message = Helper.FromJson<Message>(jsonConent); | |||
var jsonConent = _consumerContext.DeliverMessage.Content; | |||
var message = Helper.FromJson<Message>(jsonConent); | |||
object result = null; | |||
if (_executor.MethodParameters.Length > 0) | |||
{ | |||
result = await ExecuteWithParameterAsync(obj, message.Content.ToString()); | |||
} | |||
else | |||
{ | |||
result = await ExecuteAsync(obj); | |||
} | |||
object result = null; | |||
if (_executor.MethodParameters.Length > 0) | |||
{ | |||
result = await ExecuteWithParameterAsync(obj, message.Content.ToString()); | |||
} | |||
else | |||
{ | |||
result = await ExecuteAsync(obj); | |||
} | |||
if (!string.IsNullOrEmpty(message.CallbackName)) | |||
{ | |||
await SentCallbackMessage(message.Id, message.CallbackName, result); | |||
if (!string.IsNullOrEmpty(message.CallbackName)) | |||
{ | |||
await SentCallbackMessage(message.Id, message.CallbackName, result); | |||
} | |||
} | |||
} | |||
@@ -33,13 +33,13 @@ namespace DotNetCore.CAP.Internal | |||
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key); | |||
} | |||
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(IServiceProvider provider) | |||
public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates() | |||
{ | |||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | |||
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(provider)); | |||
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider)); | |||
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(provider)); | |||
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(_serviceProvider)); | |||
return executorDescriptorList; | |||
} | |||
@@ -48,35 +48,38 @@ namespace DotNetCore.CAP.Internal | |||
IServiceProvider provider) | |||
{ | |||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | |||
var consumerServices = provider.GetServices<ICapSubscribe>(); | |||
foreach (var service in consumerServices) | |||
using (var scoped = provider.CreateScope()) | |||
{ | |||
var typeInfo = service.GetType().GetTypeInfo(); | |||
if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo)) | |||
var scopedProvider = scoped.ServiceProvider; | |||
var consumerServices = scopedProvider.GetServices<ICapSubscribe>(); | |||
foreach (var service in consumerServices) | |||
{ | |||
continue; | |||
} | |||
var typeInfo = service.GetType().GetTypeInfo(); | |||
if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo)) | |||
{ | |||
continue; | |||
} | |||
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); | |||
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); | |||
} | |||
return executorDescriptorList; | |||
} | |||
return executorDescriptorList; | |||
} | |||
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes( | |||
IServiceProvider provider) | |||
{ | |||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | |||
// at cap startup time, find all Controller into the DI container,the type is object. | |||
var controllers = provider.GetServices<object>(); | |||
foreach (var controller in controllers) | |||
{ | |||
var typeInfo = controller.GetType().GetTypeInfo(); | |||
//double check | |||
if (!Helper.IsController(typeInfo)) continue; | |||
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); | |||
var types = Assembly.GetEntryAssembly().ExportedTypes; | |||
foreach (var type in types) | |||
{ | |||
var typeInfo = type.GetTypeInfo(); | |||
if (Helper.IsController(typeInfo)) | |||
{ | |||
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo)); | |||
} | |||
} | |||
return executorDescriptorList; | |||
@@ -22,12 +22,11 @@ namespace DotNetCore.CAP.Internal | |||
/// Get a dictionary of candidates.In the dictionary, | |||
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates | |||
/// </summary> | |||
/// <param name="provider"><see cref="IServiceProvider"/></param> | |||
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped(IServiceProvider provider) | |||
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped() | |||
{ | |||
if (Entries.Count != 0) return Entries; | |||
var executorCollection = _selector.SelectCandidates(provider); | |||
var executorCollection = _selector.SelectCandidates(); | |||
var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group); | |||
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.Test | |||
public void CanFindAllConsumerService() | |||
{ | |||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||
var candidates = selector.SelectCandidates(_provider); | |||
var candidates = selector.SelectCandidates(); | |||
Assert.Equal(2, candidates.Count); | |||
} | |||
@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Test | |||
public void CanFindSpecifiedTopic() | |||
{ | |||
var selector = _provider.GetRequiredService<IConsumerServiceSelector>(); | |||
var candidates = selector.SelectCandidates(_provider); | |||
var candidates = selector.SelectCandidates(); | |||
var bestCandidates = selector.SelectBestCandidate("Candidates.Foo", candidates); | |||
Assert.NotNull(bestCandidates); | |||