diff --git a/.gitignore b/.gitignore
index d7fe0b1..af4f8ef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,3 +35,7 @@ bin/
/.idea
Properties
/pack.bat
+/src/DotNetCore.CAP/project.json
+/src/DotNetCore.CAP/packages.config
+/src/DotNetCore.CAP/DotNetCore.CAP.Net47.csproj
+/NuGet.config
diff --git a/build/version.props b/build/version.props
index 604d215..6c03b29 100644
--- a/build/version.props
+++ b/build/version.props
@@ -2,7 +2,7 @@
2
0
- 0
+ 1
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/samples/Sample.RabbitMQ.SqlServer/Program.cs b/samples/Sample.RabbitMQ.SqlServer/Program.cs
index 2393f73..ed4ff09 100644
--- a/samples/Sample.RabbitMQ.SqlServer/Program.cs
+++ b/samples/Sample.RabbitMQ.SqlServer/Program.cs
@@ -1,4 +1,5 @@
using System.IO;
+using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
@@ -7,22 +8,30 @@ namespace Sample.RabbitMQ.SqlServer
{
public class Program
{
+
+ //var config = new ConfigurationBuilder()
+ // .AddCommandLine(args)
+ // .AddEnvironmentVariables("ASPNETCORE_")
+ // .Build();
+
+ //var host = new WebHostBuilder()
+ // .UseConfiguration(config)
+ // .UseKestrel()
+ // .UseContentRoot(Directory.GetCurrentDirectory())
+ // .UseIISIntegration()
+ // .UseStartup()
+ // .Build();
+
+ //host.Run();
public static void Main(string[] args)
{
- var config = new ConfigurationBuilder()
- .AddCommandLine(args)
- .AddEnvironmentVariables("ASPNETCORE_")
- .Build();
+ BuildWebHost(args).Run();
+ }
- var host = new WebHostBuilder()
- .UseConfiguration(config)
- .UseKestrel()
- .UseContentRoot(Directory.GetCurrentDirectory())
- .UseIISIntegration()
+ public static IWebHost BuildWebHost(string[] args) =>
+ WebHost.CreateDefaultBuilder(args)
.UseStartup()
.Build();
- host.Run();
- }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
index 8d9bf98..5c9b6ba 100644
--- a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
@@ -21,7 +21,7 @@ namespace DotNetCore.CAP
services.AddSingleton(kafkaOptions);
services.AddSingleton();
- services.AddTransient();
+ services.AddSingleton();
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
index 22e2af2..abc6d2b 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
@@ -23,9 +23,8 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddSingleton();
- services.AddScoped(x => x.GetService().Rent());
- services.AddTransient();
+ services.AddSingleton();
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
index 83116bf..5db81e7 100644
--- a/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/ConnectionPool.cs
@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.RabbitMQ
{
public class ConnectionPool : IConnectionPool, IDisposable
{
- private const int DefaultPoolSize = 32;
+ private const int DefaultPoolSize = 15;
private readonly ConcurrentQueue _pool = new ConcurrentQueue();
diff --git a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
index 791985d..0a47501 100644
--- a/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/PublishQueueExecutor.cs
@@ -10,27 +10,29 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor
{
private readonly ILogger _logger;
- private readonly IConnection _connection;
+ private readonly ConnectionPool _connectionPool;
private readonly RabbitMQOptions _rabbitMQOptions;
public PublishQueueExecutor(
CapOptions options,
IStateChanger stateChanger,
- IConnection connection,
+ ConnectionPool connectionPool,
RabbitMQOptions rabbitMQOptions,
ILogger logger)
: base(options, stateChanger, logger)
{
_logger = logger;
- _connection = connection;
+ _connectionPool = connectionPool;
_rabbitMQOptions = rabbitMQOptions;
}
public override Task PublishAsync(string keyName, string content)
{
+ var connection = _connectionPool.Rent();
+
try
{
- using (var channel = _connection.CreateModel())
+ using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);
@@ -55,6 +57,10 @@ namespace DotNetCore.CAP.RabbitMQ
Description = ex.Message
}));
}
+ finally
+ {
+ _connectionPool.Return(connection);
+ }
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
index a6720fd..0172c2b 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
- private IConnection _connection;
+ private ConnectionPool _connectionPool;
private IModel _channel;
private ulong _deliveryTag;
@@ -23,11 +23,11 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler OnError;
public RabbitMQConsumerClient(string queueName,
- IConnection connection,
+ ConnectionPool connectionPool,
RabbitMQOptions options)
{
_queueName = queueName;
- _connection = connection;
+ _connectionPool = connectionPool;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;
@@ -36,7 +36,9 @@ namespace DotNetCore.CAP.RabbitMQ
private void InitClient()
{
- _channel = _connection.CreateModel();
+ var connection = _connectionPool.Rent();
+
+ _channel = connection.CreateModel();
_channel.ExchangeDeclare(
exchange: _exchageName,
@@ -49,6 +51,8 @@ namespace DotNetCore.CAP.RabbitMQ
exclusive: false,
autoDelete: false,
arguments: arguments);
+
+ _connectionPool.Return(connection);
}
public void Subscribe(IEnumerable topics)
@@ -81,7 +85,6 @@ namespace DotNetCore.CAP.RabbitMQ
public void Dispose()
{
_channel.Dispose();
- _connection.Dispose();
}
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
index 5fc9d8f..753fc05 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
@@ -6,18 +6,18 @@ namespace DotNetCore.CAP.RabbitMQ
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly RabbitMQOptions _rabbitMQOptions;
- private readonly IConnection _connection;
+ private readonly ConnectionPool _connectionPool;
- public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnection connection)
+ public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, ConnectionPool pool)
{
_rabbitMQOptions = rabbitMQOptions;
- _connection = connection;
+ _connectionPool = pool;
}
public IConsumerClient Create(string groupId)
{
- return new RabbitMQConsumerClient(groupId, _connection, _rabbitMQOptions);
+ return new RabbitMQConsumerClient(groupId, _connectionPool, _rabbitMQOptions);
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
index 3053d76..f928031 100644
--- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
@@ -19,11 +19,15 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton();
- services.AddScoped();
- services.AddScoped();
+ services.AddSingleton();
+ services.AddTransient();
services.AddTransient();
services.AddTransient();
+ AddSqlServerOptions(services);
+ }
+ private void AddSqlServerOptions(IServiceCollection services)
+ {
var sqlServerOptions = new SqlServerOptions();
_configure(sqlServerOptions);
@@ -32,9 +36,13 @@ namespace DotNetCore.CAP
{
services.AddSingleton(x =>
{
- var dbContext = (DbContext)x.GetService(sqlServerOptions.DbContextType);
- sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
- return sqlServerOptions;
+ using (var scope = x.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ var dbContext = (DbContext)provider.GetService(sqlServerOptions.DbContextType);
+ sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
+ return sqlServerOptions;
+ }
});
}
else
diff --git a/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs b/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs
index b9e1e03..e7985d7 100644
--- a/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs
+++ b/src/DotNetCore.CAP/Abstractions/IConsumerServiceSelector.cs
@@ -11,10 +11,9 @@ namespace DotNetCore.CAP.Abstractions
///
/// Selects a set of candidates for the current message associated with
/// .
- ///
- /// .
+ ///
/// A set of candidates or null.
- IReadOnlyList SelectCandidates(IServiceProvider provider);
+ IReadOnlyList SelectCandidates();
///
/// Selects the best candidate from for the
diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
index 06b8249..51f645b 100644
--- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
+++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
@@ -82,15 +82,6 @@ namespace Microsoft.Extensions.DependencyInjection
{
services.AddTransient(service.Key, service.Value);
}
-
- var types = Assembly.GetEntryAssembly().ExportedTypes;
- foreach (var type in types)
- {
- if (Helper.IsController(type.GetTypeInfo()))
- {
- services.AddTransient(typeof(object), type);
- }
- }
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IBootstrapper.Default.cs b/src/DotNetCore.CAP/IBootstrapper.Default.cs
index 195916b..e4a75d5 100644
--- a/src/DotNetCore.CAP/IBootstrapper.Default.cs
+++ b/src/DotNetCore.CAP/IBootstrapper.Default.cs
@@ -25,14 +25,13 @@ namespace DotNetCore.CAP
IOptions options,
IStorage storage,
IApplicationLifetime appLifetime,
- IServiceProvider provider)
+ IEnumerable servers)
{
_logger = logger;
_appLifetime = appLifetime;
Options = options.Value;
Storage = storage;
- Provider = provider;
- Servers = Provider.GetServices();
+ Servers = servers;
_cts = new CancellationTokenSource();
_ctsRegistration = appLifetime.ApplicationStopping.Register(() =>
@@ -55,8 +54,6 @@ namespace DotNetCore.CAP
protected IEnumerable Servers { get; }
- public IServiceProvider Provider { get; private set; }
-
public Task BootstrapAsync()
{
return (_bootstrappingTask = BootstrapTaskAsync());
diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
index 2990e8b..4e69e12 100644
--- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
@@ -47,7 +47,7 @@ namespace DotNetCore.CAP
public void Start()
{
- var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
+ var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped();
foreach (var matchGroup in groupingMatchs)
{
diff --git a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
index cdcb6d6..1d2f5df 100644
--- a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
+++ b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
@@ -1,6 +1,5 @@
using System;
using DotNetCore.CAP.Abstractions;
-using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Internal
@@ -23,15 +22,13 @@ namespace DotNetCore.CAP.Internal
public IConsumerInvoker CreateInvoker(ConsumerContext consumerContext)
{
- using (var scope = _serviceProvider.CreateScope())
+ var context = new ConsumerInvokerContext(consumerContext)
{
- var context = new ConsumerInvokerContext(consumerContext)
- {
- Result = new DefaultConsumerInvoker(_logger, scope.ServiceProvider, _modelBinderFactory, consumerContext)
- };
+ Result = new DefaultConsumerInvoker(_logger, _serviceProvider,
+ _modelBinderFactory, consumerContext)
+ };
- return context.Result;
- }
+ return context.Result;
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
index 4118626..0f408be 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
@@ -35,25 +35,29 @@ namespace DotNetCore.CAP.Internal
{
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);
- var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
- _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
+ using (var scope = _serviceProvider.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ var serviceType = _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType();
+ var obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, serviceType);
- var jsonConent = _consumerContext.DeliverMessage.Content;
- var message = Helper.FromJson(jsonConent);
+ var jsonConent = _consumerContext.DeliverMessage.Content;
+ var message = Helper.FromJson(jsonConent);
- object result = null;
- if (_executor.MethodParameters.Length > 0)
- {
- result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
- }
- else
- {
- result = await ExecuteAsync(obj);
- }
+ object result = null;
+ if (_executor.MethodParameters.Length > 0)
+ {
+ result = await ExecuteWithParameterAsync(obj, message.Content.ToString());
+ }
+ else
+ {
+ result = await ExecuteAsync(obj);
+ }
- if (!string.IsNullOrEmpty(message.CallbackName))
- {
- await SentCallbackMessage(message.Id, message.CallbackName, result);
+ if (!string.IsNullOrEmpty(message.CallbackName))
+ {
+ await SentCallbackMessage(message.Id, message.CallbackName, result);
+ }
}
}
diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
index 4746b5c..9dd0d18 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
@@ -33,13 +33,13 @@ namespace DotNetCore.CAP.Internal
return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
}
- public IReadOnlyList SelectCandidates(IServiceProvider provider)
+ public IReadOnlyList SelectCandidates()
{
var executorDescriptorList = new List();
- executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(provider));
+ executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider));
- executorDescriptorList.AddRange(FindConsumersFromControllerTypes(provider));
+ executorDescriptorList.AddRange(FindConsumersFromControllerTypes(_serviceProvider));
return executorDescriptorList;
}
@@ -48,35 +48,38 @@ namespace DotNetCore.CAP.Internal
IServiceProvider provider)
{
var executorDescriptorList = new List();
-
- var consumerServices = provider.GetServices();
- foreach (var service in consumerServices)
+
+ using (var scoped = provider.CreateScope())
{
- var typeInfo = service.GetType().GetTypeInfo();
- if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
+ var scopedProvider = scoped.ServiceProvider;
+ var consumerServices = scopedProvider.GetServices();
+ foreach (var service in consumerServices)
{
- continue;
- }
+ var typeInfo = service.GetType().GetTypeInfo();
+ if (!typeof(ICapSubscribe).GetTypeInfo().IsAssignableFrom(typeInfo))
+ {
+ continue;
+ }
- executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
+ executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
+ }
+ return executorDescriptorList;
}
- return executorDescriptorList;
}
private static IEnumerable FindConsumersFromControllerTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List();
- // at cap startup time, find all Controller into the DI container,the type is object.
- var controllers = provider.GetServices
- ///
- public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped(IServiceProvider provider)
+ public ConcurrentDictionary> GetCandidatesMethodsOfGroupNameGrouped()
{
if (Entries.Count != 0) return Entries;
- var executorCollection = _selector.SelectCandidates(provider);
+ var executorCollection = _selector.SelectCandidates();
var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group);
diff --git a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
index 5ed56e1..7704e88 100644
--- a/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
+++ b/test/DotNetCore.CAP.Test/ConsumerServiceSelectorTest.cs
@@ -25,7 +25,7 @@ namespace DotNetCore.CAP.Test
public void CanFindAllConsumerService()
{
var selector = _provider.GetRequiredService();
- var candidates = selector.SelectCandidates(_provider);
+ var candidates = selector.SelectCandidates();
Assert.Equal(2, candidates.Count);
}
@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Test
public void CanFindSpecifiedTopic()
{
var selector = _provider.GetRequiredService();
- var candidates = selector.SelectCandidates(_provider);
+ var candidates = selector.SelectCandidates();
var bestCandidates = selector.SelectBestCandidate("Candidates.Foo", candidates);
Assert.NotNull(bestCandidates);