diff --git a/CAP.sln b/CAP.sln
index d0d9280..98c03d2 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.26730.15
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.29025.244
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
@@ -68,8 +68,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.InMemoryStor
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.InMemory", "samples\Sample.AzureServiceBus.InMemory\Sample.AzureServiceBus.InMemory.csproj", "{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.CastleCoreTest", "test\DotNetCore.CAP.CastleCoreTest\DotNetCore.CAP.CastleCoreTest.csproj", "{BA499B87-77A9-43A2-98A3-89ECF5034E26}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -144,10 +142,6 @@ Global
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8}.Release|Any CPU.Build.0 = Release|Any CPU
- {BA499B87-77A9-43A2-98A3-89ECF5034E26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {BA499B87-77A9-43A2-98A3-89ECF5034E26}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {BA499B87-77A9-43A2-98A3-89ECF5034E26}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {BA499B87-77A9-43A2-98A3-89ECF5034E26}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -171,7 +165,6 @@ Global
{63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{58B6E829-C6C8-457C-9DD0-C600650254DF} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{1E1E959C-3D0E-45C3-ABCA-DAAACE68AAB8} = {3A6B6931-A123-477A-9469-8B468B5385AF}
- {BA499B87-77A9-43A2-98A3-89ECF5034E26} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
diff --git a/CAP.sln.DotSettings b/CAP.sln.DotSettings
new file mode 100644
index 0000000..3eef960
--- /dev/null
+++ b/CAP.sln.DotSettings
@@ -0,0 +1,4 @@
+
+ DB
+ True
+ True
\ No newline at end of file
diff --git a/build/version.props b/build/version.props
index 790099b..0550ced 100644
--- a/build/version.props
+++ b/build/version.props
@@ -1,8 +1,8 @@
2
- 5
- 2
+ 6
+ 0
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
index d07bc2e..7f61a74 100644
--- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
+++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs
@@ -10,11 +10,14 @@ using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClient : IConsumerClient
{
+ private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
private readonly ILogger _logger;
private readonly string _subscriptionName;
private readonly AzureServiceBusOptions _asbOptions;
@@ -26,13 +29,11 @@ namespace DotNetCore.CAP.AzureServiceBus
public AzureServiceBusConsumerClient(
ILogger logger,
string subscriptionName,
- AzureServiceBusOptions options)
+ IOptions options)
{
_logger = logger;
_subscriptionName = subscriptionName;
- _asbOptions = options ?? throw new ArgumentNullException(nameof(options));
-
- InitAzureServiceBusClient().GetAwaiter().GetResult();
+ _asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
public event EventHandler OnMessageReceived;
@@ -48,6 +49,8 @@ namespace DotNetCore.CAP.AzureServiceBus
throw new ArgumentNullException(nameof(topics));
}
+ ConnectAsync().GetAwaiter().GetResult();
+
var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name);
foreach (var newRule in topics.Except(allRuleNames))
@@ -73,6 +76,8 @@ namespace DotNetCore.CAP.AzureServiceBus
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
+ ConnectAsync().GetAwaiter().GetResult();
+
_consumerClient.RegisterMessageHandler(OnConsumerReceived,
new MessageHandlerOptions(OnExceptionReceived)
{
@@ -106,33 +111,50 @@ namespace DotNetCore.CAP.AzureServiceBus
#region private methods
- private async Task InitAzureServiceBusClient()
+ private async Task ConnectAsync()
{
- ManagementClient mClient;
- if (_asbOptions.ManagementTokenProvider != null)
- {
- mClient = new ManagementClient(new ServiceBusConnectionStringBuilder(
- _asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider);
- }
- else
+ if (_consumerClient != null)
{
- mClient = new ManagementClient(_asbOptions.ConnectionString);
+ return;
}
- if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath))
+ _connectionLock.Wait();
+
+ try
{
- await mClient.CreateTopicAsync(_asbOptions.TopicPath);
- _logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}");
+ if (_consumerClient == null)
+ {
+ ManagementClient mClient;
+ if (_asbOptions.ManagementTokenProvider != null)
+ {
+ mClient = new ManagementClient(new ServiceBusConnectionStringBuilder(
+ _asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider);
+ }
+ else
+ {
+ mClient = new ManagementClient(_asbOptions.ConnectionString);
+ }
+
+ if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath))
+ {
+ await mClient.CreateTopicAsync(_asbOptions.TopicPath);
+ _logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}");
+ }
+
+ if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
+ {
+ await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
+ _logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
+ }
+
+ _consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName,
+ ReceiveMode.PeekLock, RetryPolicy.Default);
+ }
}
-
- if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName))
+ finally
{
- await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName);
- _logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}");
+ _connectionLock.Release();
}
-
- _consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName,
- ReceiveMode.PeekLock, RetryPolicy.Default);
}
private Task OnConsumerReceived(Message message, CancellationToken token)
diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs
index a31bdd4..20f25f6 100644
--- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs
+++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs
@@ -2,17 +2,18 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory
{
private readonly ILoggerFactory _loggerFactory;
- private readonly AzureServiceBusOptions _asbOptions;
+ private readonly IOptions _asbOptions;
public AzureServiceBusConsumerClientFactory(
ILoggerFactory loggerFactory,
- AzureServiceBusOptions asbOptions)
+ IOptions asbOptions)
{
_loggerFactory = loggerFactory;
_asbOptions = asbOptions;
diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs
index b7d95c6..395c0f0 100644
--- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs
+++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs
@@ -21,9 +21,7 @@ namespace DotNetCore.CAP
{
services.AddSingleton();
- var azureServiceBusOptions = new AzureServiceBusOptions();
- _configure?.Invoke(azureServiceBusOptions);
- services.AddSingleton(azureServiceBusOptions);
+ services.Configure(_configure);
services.AddSingleton();
services.AddSingleton();
diff --git a/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs b/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs
index 898e58b..b68039e 100644
--- a/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs
+++ b/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs
@@ -3,40 +3,45 @@
using System;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.AzureServiceBus
{
internal class AzureServiceBusPublishMessageSender : BasePublishMessageSender
{
+ private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
private readonly ILogger _logger;
- private readonly ITopicClient _topicClient;
+ private readonly IOptions _asbOptions;
+
+ private ITopicClient _topicClient;
public AzureServiceBusPublishMessageSender(
ILogger logger,
- CapOptions options,
- AzureServiceBusOptions asbOptions,
+ IOptions options,
+ IOptions asbOptions,
IStateChanger stateChanger,
IStorageConnection connection)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
- ServersAddress = asbOptions.ConnectionString;
-
- _topicClient = new TopicClient(
- ServersAddress,
- asbOptions.TopicPath,
- RetryPolicy.NoRetry);
+ _asbOptions = asbOptions;
}
+ protected override string ServersAddress => _asbOptions.Value.ConnectionString;
+
public override async Task PublishAsync(string keyName, string content)
{
try
{
+ Connect();
+
var contentBytes = Encoding.UTF8.GetBytes(content);
var message = new Message
@@ -59,5 +64,27 @@ namespace DotNetCore.CAP.AzureServiceBus
return OperateResult.Failed(wrapperEx);
}
}
+
+ private void Connect()
+ {
+ if (_topicClient != null)
+ {
+ return;
+ }
+
+ _connectionLock.Wait();
+
+ try
+ {
+ if (_topicClient == null)
+ {
+ _topicClient = new TopicClient(ServersAddress, _asbOptions.Value.TopicPath, RetryPolicy.NoRetry);
+ }
+ }
+ finally
+ {
+ _connectionLock.Release();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs
index e9c2f4a..df03218 100644
--- a/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.InMemoryStorage/CAP.InMemoryCapOptionsExtension.cs
@@ -17,9 +17,9 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddSingleton();
- services.AddSingleton();
+ services.AddSingleton(x => (InMemoryPublisher)x.GetService());
+ services.AddSingleton();
- services.AddTransient();
services.AddTransient();
}
}
diff --git a/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs
index 61bdae6..3783ddb 100644
--- a/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs
+++ b/src/DotNetCore.CAP.InMemoryStorage/IStorageConnection.InMemory.cs
@@ -8,6 +8,7 @@ using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.InMemoryStorage
{
@@ -15,9 +16,9 @@ namespace DotNetCore.CAP.InMemoryStorage
{
private readonly CapOptions _capOptions;
- public InMemoryStorageConnection(CapOptions capOptions)
+ public InMemoryStorageConnection(IOptions capOptions)
{
- _capOptions = capOptions;
+ _capOptions = capOptions.Value;
PublishedMessages = new List();
ReceivedMessages = new List();
diff --git a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
index 88db8fe..55902e7 100644
--- a/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.Kafka/CAP.KafkaCapOptionsExtension.cs
@@ -21,9 +21,7 @@ namespace DotNetCore.CAP
{
services.AddSingleton();
- var kafkaOptions = new KafkaOptions();
- _configure?.Invoke(kafkaOptions);
- services.AddSingleton(kafkaOptions);
+ services.Configure(_configure);
services.AddSingleton();
services.AddSingleton();
diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
index bba836c..c052d29 100644
--- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
+++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
@@ -13,7 +13,7 @@
-
+
diff --git a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
index ab3c273..5417bdd 100644
--- a/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
+++ b/src/DotNetCore.CAP.Kafka/IConnectionPool.Default.cs
@@ -6,6 +6,7 @@ using System.Collections.Concurrent;
using System.Threading;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace DotNetCore.CAP.Kafka
@@ -17,18 +18,16 @@ namespace DotNetCore.CAP.Kafka
private int _pCount;
private int _maxSize;
- public ConnectionPool(ILogger logger, KafkaOptions options)
+ public ConnectionPool(ILogger logger, IOptions options)
{
- ServersAddress = options.Servers;
-
- _options = options;
+ _options = options.Value;
_producerPool = new ConcurrentQueue>();
- _maxSize = options.ConnectionPoolSize;
-
- logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(options.AsKafkaConfig(), Formatting.Indented));
+ _maxSize = _options.ConnectionPoolSize;
+
+ logger.LogDebug("Kafka configuration of CAP :\r\n {0}", JsonConvert.SerializeObject(_options.AsKafkaConfig(), Formatting.Indented));
}
- public string ServersAddress { get; }
+ public string ServersAddress => _options.Servers;
public IProducer RentProducer()
{
diff --git a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
index 0999fc2..2cd0b3e 100644
--- a/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
+++ b/src/DotNetCore.CAP.Kafka/IPublishMessageSender.Kafka.cs
@@ -7,6 +7,7 @@ using Confluent.Kafka;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
@@ -16,15 +17,19 @@ namespace DotNetCore.CAP.Kafka
private readonly ILogger _logger;
public KafkaPublishMessageSender(
- CapOptions options, IStateChanger stateChanger, IStorageConnection connection,
- IConnectionPool connectionPool, ILogger logger)
+ ILogger logger,
+ IOptions options,
+ IStorageConnection connection,
+ IConnectionPool connectionPool,
+ IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionPool = connectionPool;
- ServersAddress = _connectionPool.ServersAddress;
}
+ protected override string ServersAddress => _connectionPool.ServersAddress;
+
public override async Task PublishAsync(string keyName, string content)
{
var producer = _connectionPool.RentProducer();
diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
index 09cd14d..5753654 100644
--- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
+++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
@@ -5,25 +5,24 @@ using System;
using System.Collections.Generic;
using System.Threading;
using Confluent.Kafka;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClient : IConsumerClient
{
+ private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
private readonly string _groupId;
private readonly KafkaOptions _kafkaOptions;
private IConsumer _consumerClient;
- public KafkaConsumerClient(string groupId, KafkaOptions options)
+ public KafkaConsumerClient(string groupId, IOptions options)
{
_groupId = groupId;
- _kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
-
- InitKafkaClient();
+ _kafkaOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}
- public IDeserializer StringDeserializer { get; set; }
-
public event EventHandler OnMessageReceived;
public event EventHandler OnLog;
@@ -37,11 +36,15 @@ namespace DotNetCore.CAP.Kafka
throw new ArgumentNullException(nameof(topics));
}
+ Connect();
+
_consumerClient.Subscribe(topics);
}
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
+ Connect();
+
while (true)
{
var consumerResult = _consumerClient.Consume(cancellationToken);
@@ -77,17 +80,32 @@ namespace DotNetCore.CAP.Kafka
#region private methods
- private void InitKafkaClient()
+ private void Connect()
{
- lock (_kafkaOptions)
+ if (_consumerClient != null)
{
- _kafkaOptions.MainConfig["group.id"] = _groupId;
- _kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
- var config = _kafkaOptions.AsKafkaConfig();
- _consumerClient = new ConsumerBuilder(config)
- .SetErrorHandler(ConsumerClient_OnConsumeError)
- .Build();
+ return;
}
+
+ _connectionLock.Wait();
+
+ try
+ {
+ if (_consumerClient == null)
+ {
+ _kafkaOptions.MainConfig["group.id"] = _groupId;
+ _kafkaOptions.MainConfig["auto.offset.reset"] = "earliest";
+ var config = _kafkaOptions.AsKafkaConfig();
+
+ _consumerClient = new ConsumerBuilder(config)
+ .SetErrorHandler(ConsumerClient_OnConsumeError)
+ .Build();
+ }
+ }
+ finally
+ {
+ _connectionLock.Release();
+ }
}
private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e)
diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
index 7426312..1c1ebf0 100644
--- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
+++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs
@@ -1,13 +1,15 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using Microsoft.Extensions.Options;
+
namespace DotNetCore.CAP.Kafka
{
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory
{
- private readonly KafkaOptions _kafkaOptions;
+ private readonly IOptions _kafkaOptions;
- public KafkaConsumerClientFactory(KafkaOptions kafkaOptions)
+ public KafkaConsumerClientFactory(IOptions kafkaOptions)
{
_kafkaOptions = kafkaOptions;
}
diff --git a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
index 6f38367..a3ae8bf 100644
--- a/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
@@ -5,6 +5,7 @@ using System;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
@@ -25,18 +26,20 @@ namespace DotNetCore.CAP.MongoDB
services.AddSingleton();
services.AddSingleton();
- services.AddScoped();
- services.AddScoped();
+ services.AddSingleton();
+ services.AddSingleton(x => (MongoDBPublisher)x.GetService());
+ services.AddSingleton();
- services.AddTransient();
services.AddTransient();
- var options = new MongoDBOptions();
- _configure?.Invoke(options);
- services.AddSingleton(options);
+ services.Configure(_configure);
//Try to add IMongoClient if does not exists
- services.TryAddSingleton(new MongoClient(options.DatabaseConnection));
+ services.TryAddSingleton(x =>
+ {
+ var options = x.GetService>().Value;
+ return new MongoClient(options.DatabaseConnection);
+ });
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs
index d43e868..a3ea451 100644
--- a/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/ICapPublisher.MongoDB.cs
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
@@ -16,10 +17,9 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
- public MongoDBPublisher(IServiceProvider provider, MongoDBOptions options)
- : base(provider)
+ public MongoDBPublisher(IServiceProvider provider) : base(provider)
{
- _options = options;
+ _options = provider.GetService>().Value;
_client = ServiceProvider.GetRequiredService();
}
@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.MongoDB
protected override Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken))
{
- var insertOptions = new InsertOneOptions {BypassDocumentValidation = false};
+ var insertOptions = new InsertOneOptions { BypassDocumentValidation = false };
var collection = _client
.GetDatabase(_options.DatabaseName)
@@ -51,11 +51,10 @@ namespace DotNetCore.CAP.MongoDB
if (NotUseTransaction)
{
-
return collection.InsertOneAsync(store, insertOptions, cancel);
}
- var dbTrans = (IClientSessionHandle) transaction.DbTransaction;
+ var dbTrans = (IClientSessionHandle)transaction.DbTransaction;
return collection.InsertOneAsync(dbTrans, store, insertOptions, cancel);
}
}
diff --git a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
index dd0ce50..e52bea7 100644
--- a/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/ICapTransaction.MongoDB.cs
@@ -39,6 +39,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IClientSessionHandle)?.Dispose();
+ DbTransaction = null;
}
}
diff --git a/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs
index d807598..5a9b200 100644
--- a/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/ICollectProcessor.MongoDB.cs
@@ -3,9 +3,9 @@
using System;
using System.Threading.Tasks;
-using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
@@ -17,19 +17,19 @@ namespace DotNetCore.CAP.MongoDB
private readonly MongoDBOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
- public MongoDBCollectProcessor(ILogger logger,
- MongoDBOptions options,
+ public MongoDBCollectProcessor(
+ ILogger logger,
+ IOptions options,
IMongoClient client)
{
- _options = options;
+ _options = options.Value;
_logger = logger;
_database = client.GetDatabase(_options.DatabaseName);
}
public async Task ProcessAsync(ProcessingContext context)
{
- _logger.LogDebug(
- $"Collecting expired data from collection [{_options.PublishedCollection}].");
+ _logger.LogDebug($"Collecting expired data from collection [{_options.PublishedCollection}].");
var publishedCollection = _database.GetCollection(_options.PublishedCollection);
var receivedCollection = _database.GetCollection(_options.ReceivedCollection);
@@ -39,6 +39,7 @@ namespace DotNetCore.CAP.MongoDB
new DeleteManyModel(
Builders.Filter.Lt(x => x.ExpiresAt, DateTime.Now))
});
+
await receivedCollection.BulkWriteAsync(new[]
{
new DeleteManyModel(
diff --git a/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs
index 0553b1b..6bf0a26 100644
--- a/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/IMonitoringApi.MongoDB.cs
@@ -7,6 +7,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;
@@ -17,10 +18,10 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
- public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
+ public MongoDBMonitoringApi(IMongoClient client, IOptions options)
{
var mongoClient = client ?? throw new ArgumentNullException(nameof(client));
- _options = options ?? throw new ArgumentNullException(nameof(options));
+ _options = options.Value ?? throw new ArgumentNullException(nameof(options));
_database = mongoClient.GetDatabase(_options.DatabaseName);
}
@@ -140,7 +141,7 @@ namespace DotNetCore.CAP.MongoDB
private int GetNumberOfMessage(string collectionName, string statusName)
{
var collection = _database.GetCollection(collectionName);
- var count = collection.CountDocuments(new BsonDocument {{"StatusName", statusName}});
+ var count = collection.CountDocuments(new BsonDocument { { "StatusName", statusName } });
return int.Parse(count.ToString());
}
@@ -199,7 +200,7 @@ namespace DotNetCore.CAP.MongoDB
}
};
- var pipeline = new[] {match, groupby};
+ var pipeline = new[] { match, groupby };
var collection = _database.GetCollection(collectionName);
var result = collection.Aggregate(pipeline).ToList();
diff --git a/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs
index 87b50a9..f5df9fe 100644
--- a/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/IStorage.MongoDB.cs
@@ -6,19 +6,21 @@ using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorage : IStorage
{
- private readonly CapOptions _capOptions;
+ private readonly IOptions _capOptions;
private readonly IMongoClient _client;
private readonly ILogger _logger;
- private readonly MongoDBOptions _options;
+ private readonly IOptions _options;
- public MongoDBStorage(CapOptions capOptions,
- MongoDBOptions options,
+ public MongoDBStorage(
+ IOptions capOptions,
+ IOptions options,
IMongoClient client,
ILogger logger)
{
@@ -45,31 +47,32 @@ namespace DotNetCore.CAP.MongoDB
return;
}
- var database = _client.GetDatabase(_options.DatabaseName);
- var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();
+ var options = _options.Value;
+ var database = _client.GetDatabase(options.DatabaseName);
+ var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken)).ToList();
- if (names.All(n => n != _options.ReceivedCollection))
+ if (names.All(n => n != options.ReceivedCollection))
{
- await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
+ await database.CreateCollectionAsync(options.ReceivedCollection, cancellationToken: cancellationToken);
}
- if (names.All(n => n != _options.PublishedCollection))
+ if (names.All(n => n != options.PublishedCollection))
{
- await database.CreateCollectionAsync(_options.PublishedCollection,
+ await database.CreateCollectionAsync(options.PublishedCollection,
cancellationToken: cancellationToken);
}
- var receivedMessageIndexNames = new string[] {
+ var receivedMessageIndexNames = new[] {
nameof(ReceivedMessage.Name), nameof(ReceivedMessage.Added), nameof(ReceivedMessage.ExpiresAt),
nameof(ReceivedMessage.StatusName), nameof(ReceivedMessage.Retries), nameof(ReceivedMessage.Version) };
- var publishedMessageIndexNames = new string[] {
+ var publishedMessageIndexNames = new[] {
nameof(PublishedMessage.Name), nameof(PublishedMessage.Added), nameof(PublishedMessage.ExpiresAt),
nameof(PublishedMessage.StatusName), nameof(PublishedMessage.Retries), nameof(PublishedMessage.Version) };
await Task.WhenAll(
- TryCreateIndexesAsync(_options.ReceivedCollection, receivedMessageIndexNames),
- TryCreateIndexesAsync(_options.PublishedCollection, publishedMessageIndexNames)
+ TryCreateIndexesAsync(options.ReceivedCollection, receivedMessageIndexNames),
+ TryCreateIndexesAsync(options.PublishedCollection, publishedMessageIndexNames)
);
_logger.LogDebug("Ensuring all create database tables script are applied.");
@@ -87,15 +90,15 @@ namespace DotNetCore.CAP.MongoDB
if (indexNames.Any() == false)
return;
- var indexes = indexNames.Select(index_name =>
+ var indexes = indexNames.Select(indexName =>
{
var indexOptions = new CreateIndexOptions
{
- Name = index_name,
+ Name = indexName,
Background = true,
};
var indexBuilder = Builders.IndexKeys;
- return new CreateIndexModel(indexBuilder.Descending(index_name), indexOptions);
+ return new CreateIndexModel(indexBuilder.Descending(indexName), indexOptions);
}).ToArray();
await col.Indexes.CreateManyAsync(indexes, cancellationToken);
diff --git a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs
index eaad75a..387e4c7 100644
--- a/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/IStorageConnection.MongoDB.cs
@@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
using MongoDB.Driver;
namespace DotNetCore.CAP.MongoDB
@@ -17,10 +18,13 @@ namespace DotNetCore.CAP.MongoDB
private readonly IMongoDatabase _database;
private readonly MongoDBOptions _options;
- public MongoDBStorageConnection(CapOptions capOptions, MongoDBOptions options, IMongoClient client)
+ public MongoDBStorageConnection(
+ IOptions capOptions,
+ IOptions options,
+ IMongoClient client)
{
- _capOptions = capOptions;
- _options = options;
+ _capOptions = capOptions.Value;
+ _options = options.Value;
_client = client;
_database = _client.GetDatabase(_options.DatabaseName);
}
diff --git a/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs
index 7d1a22b..1cbeff8 100644
--- a/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs
+++ b/src/DotNetCore.CAP.MongoDB/IStorageTransaction.MongoDB.cs
@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.MongoDB
public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{
_options = options;
- _database = client.GetDatabase(options.DatabaseName);
+ _database = client.GetDatabase(_options.DatabaseName);
_session = client.StartSession();
_session.StartTransaction();
}
diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
index 55a3377..d9c7569 100644
--- a/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.MySql/CAP.MySqlCapOptionsExtension.cs
@@ -4,8 +4,8 @@
using System;
using DotNetCore.CAP.MySql;
using DotNetCore.CAP.Processor;
-using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -24,39 +24,15 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton(provider => (MySqlPublisher)provider.GetService());
+ services.AddSingleton();
- services.AddScoped();
- services.AddScoped();
-
- services.AddTransient();
services.AddTransient();
- AddSingletionMySqlOptions(services);
- }
-
- private void AddSingletionMySqlOptions(IServiceCollection services)
- {
- var mysqlOptions = new MySqlOptions();
-
- _configure(mysqlOptions);
-
- if (mysqlOptions.DbContextType != null)
- {
- services.AddSingleton(x =>
- {
- using (var scope = x.CreateScope())
- {
- var provider = scope.ServiceProvider;
- var dbContext = (DbContext) provider.GetService(mysqlOptions.DbContextType);
- mysqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
- return mysqlOptions;
- }
- });
- }
- else
- {
- services.AddSingleton(mysqlOptions);
- }
- }
+ //Add MySqlOptions
+ services.Configure(_configure);
+ services.AddSingleton, ConfigureMySqlOptions>();
+ }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
index bcac400..bac37d2 100644
--- a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
+++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
@@ -1,6 +1,10 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
@@ -10,4 +14,29 @@ namespace DotNetCore.CAP
///
public string ConnectionString { get; set; }
}
+
+ internal class ConfigureMySqlOptions : IConfigureOptions
+ {
+ private readonly IServiceScopeFactory _serviceScopeFactory;
+
+ public ConfigureMySqlOptions(IServiceScopeFactory serviceScopeFactory)
+ {
+ _serviceScopeFactory = serviceScopeFactory;
+ }
+
+ public void Configure(MySqlOptions options)
+ {
+ if (options.DbContextType != null)
+ {
+ using (var scope = _serviceScopeFactory.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
+ {
+ options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs b/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs
index f22c82a..d95c959 100644
--- a/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/ICapPublisher.MySql.cs
@@ -10,6 +10,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.MySql
public MySqlPublisher(IServiceProvider provider) : base(provider)
{
- _options = provider.GetService();
+ _options = provider.GetService>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
diff --git a/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs b/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
index 7a77648..3dd9b9e 100644
--- a/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
@@ -50,6 +50,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
+ DbTransaction = null;
}
}
diff --git a/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs b/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
index 2232e9e..55955c7 100644
--- a/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/ICollectProcessor.MySql.cs
@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
@@ -18,11 +19,10 @@ namespace DotNetCore.CAP.MySql
private readonly MySqlOptions _options;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
- public MySqlCollectProcessor(ILogger logger,
- MySqlOptions mysqlOptions)
+ public MySqlCollectProcessor(ILogger logger, IOptions mysqlOptions)
{
_logger = logger;
- _options = mysqlOptions;
+ _options = mysqlOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
@@ -44,7 +44,7 @@ namespace DotNetCore.CAP.MySql
{
removedCount = await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @now limit @count;",
- new {now = DateTime.Now, count = MaxBatch});
+ new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
diff --git a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
index 353a279..7cd4135 100644
--- a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs
@@ -10,6 +10,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.MySql
{
@@ -18,10 +19,10 @@ namespace DotNetCore.CAP.MySql
private readonly string _prefix;
private readonly MySqlStorage _storage;
- public MySqlMonitoringApi(IStorage storage, MySqlOptions options)
+ public MySqlMonitoringApi(IStorage storage, IOptions options)
{
_storage = storage as MySqlStorage ?? throw new ArgumentNullException(nameof(storage));
- _prefix = options?.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
+ _prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options));
}
public StatisticsDto GetStatistics()
@@ -126,7 +127,7 @@ select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix);
{
var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state";
- var count = connection.ExecuteScalar(sqlQuery, new {state = statusName});
+ var count = connection.ExecuteScalar(sqlQuery, new { state = statusName });
return count;
}
@@ -169,7 +170,7 @@ select aggr.* from (
var valuesMap = connection.Query(
sqlQuery,
- new {keys = keyMaps.Keys, statusName})
+ new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => x.Key, x => x.Count);
foreach (var key in keyMaps.Keys)
diff --git a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IStorage.MySql.cs
index 564935f..5be40b8 100644
--- a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/IStorage.MySql.cs
@@ -8,20 +8,22 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlStorage : IStorage
{
- private readonly CapOptions _capOptions;
+ private readonly IOptions _capOptions;
+ private readonly IOptions _options;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
- private readonly MySqlOptions _options;
- public MySqlStorage(ILogger logger,
- MySqlOptions options,
- CapOptions capOptions)
+ public MySqlStorage(
+ ILogger logger,
+ IOptions options,
+ IOptions capOptions)
{
_options = options;
_capOptions = capOptions;
@@ -45,10 +47,10 @@ namespace DotNetCore.CAP.MySql
return;
}
- var sql = CreateDbTablesScript(_options.TableNamePrefix);
- using (var connection = new MySqlConnection(_options.ConnectionString))
+ var sql = CreateDbTablesScript(_options.Value.TableNamePrefix);
+ using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
- await connection.ExecuteAsync(sql);
+ await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
@@ -103,7 +105,7 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
internal IDbConnection CreateAndOpenConnection()
{
- var connection = _existingConnection ?? new MySqlConnection(_options.ConnectionString);
+ var connection = _existingConnection ?? new MySqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
diff --git a/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs b/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs
index 47d2fae..5f300bb 100644
--- a/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs
+++ b/src/DotNetCore.CAP.MySql/IStorageConnection.MySql.cs
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
@@ -14,16 +15,17 @@ namespace DotNetCore.CAP.MySql
public class MySqlStorageConnection : IStorageConnection
{
private readonly CapOptions _capOptions;
+ private readonly IOptions _options;
private readonly string _prefix;
- public MySqlStorageConnection(MySqlOptions options, CapOptions capOptions)
+ public MySqlStorageConnection(IOptions options, IOptions capOptions)
{
- _capOptions = capOptions;
- Options = options;
- _prefix = Options.TableNamePrefix;
+ _options = options;
+ _capOptions = capOptions.Value;
+ _prefix = options.Value.TableNamePrefix;
}
- public MySqlOptions Options { get; }
+ public MySqlOptions Options => _options.Value;
public IStorageTransaction CreateTransaction()
{
diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
index 1c0e266..a3abe19 100644
--- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
@@ -4,8 +4,8 @@
using System;
using DotNetCore.CAP.PostgreSql;
using DotNetCore.CAP.Processor;
-using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -24,38 +24,14 @@ namespace DotNetCore.CAP
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton(provider => (PostgreSqlPublisher)provider.GetService());
+ services.AddSingleton();
- services.AddScoped();
- services.AddScoped();
-
- services.AddTransient();
services.AddTransient();
- AddSingletonPostgreSqlOptions(services);
- }
-
- private void AddSingletonPostgreSqlOptions(IServiceCollection services)
- {
- var postgreSqlOptions = new PostgreSqlOptions();
- _configure(postgreSqlOptions);
-
- if (postgreSqlOptions.DbContextType != null)
- {
- services.AddSingleton(x =>
- {
- using (var scope = x.CreateScope())
- {
- var provider = scope.ServiceProvider;
- var dbContext = (DbContext) provider.GetService(postgreSqlOptions.DbContextType);
- postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
- return postgreSqlOptions;
- }
- });
- }
- else
- {
- services.AddSingleton(postgreSqlOptions);
- }
+ services.Configure(_configure);
+ services.AddSingleton, ConfigurePostgreSqlOptions>();
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
index 9f52154..ba30375 100644
--- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
+++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
@@ -1,6 +1,11 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
+// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
@@ -10,4 +15,29 @@ namespace DotNetCore.CAP
///
public string ConnectionString { get; set; }
}
+
+ internal class ConfigurePostgreSqlOptions : IConfigureOptions
+ {
+ private readonly IServiceScopeFactory _serviceScopeFactory;
+
+ public ConfigurePostgreSqlOptions(IServiceScopeFactory serviceScopeFactory)
+ {
+ _serviceScopeFactory = serviceScopeFactory;
+ }
+
+ public void Configure(PostgreSqlOptions options)
+ {
+ if (options.DbContextType != null)
+ {
+ using (var scope = _serviceScopeFactory.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
+ {
+ options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs
index 95c0c63..ec9e787 100644
--- a/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/ICapPublisher.PostgreSql.cs
@@ -10,6 +10,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.PostgreSql
public PostgreSqlPublisher(IServiceProvider provider) : base(provider)
{
- _options = provider.GetService();
+ _options = provider.GetService>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
diff --git a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs
index ef6b4d7..08a0af8 100644
--- a/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/ICapTransaction.PostgreSql.cs
@@ -50,6 +50,7 @@ namespace DotNetCore.CAP
public override void Dispose()
{
(DbTransaction as IDbTransaction)?.Dispose();
+ DbTransaction = null;
}
}
diff --git a/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
index d82f0b8..8235b57 100644
--- a/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/ICollectlProcessor.PostgreSql.cs
@@ -6,6 +6,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
@@ -25,10 +26,10 @@ namespace DotNetCore.CAP.PostgreSql
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public PostgreSqlCollectProcessor(ILogger logger,
- PostgreSqlOptions sqlServerOptions)
+ IOptions sqlServerOptions)
{
_logger = logger;
- _options = sqlServerOptions;
+ _options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
@@ -44,7 +45,7 @@ namespace DotNetCore.CAP.PostgreSql
{
removedCount = await connection.ExecuteAsync(
$"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
- new {now = DateTime.Now, count = MaxBatch});
+ new { now = DateTime.Now, count = MaxBatch });
}
if (removedCount != 0)
diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
index 17b81b2..9760188 100644
--- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
@@ -10,6 +10,7 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.PostgreSql
{
@@ -18,9 +19,9 @@ namespace DotNetCore.CAP.PostgreSql
private readonly PostgreSqlOptions _options;
private readonly PostgreSqlStorage _storage;
- public PostgreSqlMonitoringApi(IStorage storage, PostgreSqlOptions options)
+ public PostgreSqlMonitoringApi(IStorage storage, IOptions options)
{
- _options = options ?? throw new ArgumentNullException(nameof(options));
+ _options = options.Value ?? throw new ArgumentNullException(nameof(options));
_storage = storage as PostgreSqlStorage ?? throw new ArgumentNullException(nameof(storage));
}
diff --git a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs
index 187fac2..c13d18a 100644
--- a/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/IStorage.PostgreSql.cs
@@ -8,20 +8,21 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Dashboard;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
- private readonly CapOptions _capOptions;
+ private readonly IOptions _capOptions;
private readonly IDbConnection _existingConnection = null;
private readonly ILogger _logger;
- private readonly PostgreSqlOptions _options;
+ private readonly IOptions _options;
public PostgreSqlStorage(ILogger logger,
- CapOptions capOptions,
- PostgreSqlOptions options)
+ IOptions capOptions,
+ IOptions options)
{
_options = options;
_logger = logger;
@@ -45,9 +46,9 @@ namespace DotNetCore.CAP.PostgreSql
return;
}
- var sql = CreateDbTablesScript(_options.Schema);
+ var sql = CreateDbTablesScript(_options.Value.Schema);
- using (var connection = new NpgsqlConnection(_options.ConnectionString))
+ using (var connection = new NpgsqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
@@ -72,7 +73,7 @@ namespace DotNetCore.CAP.PostgreSql
internal IDbConnection CreateAndOpenConnection()
{
- var connection = _existingConnection ?? new NpgsqlConnection(_options.ConnectionString);
+ var connection = _existingConnection ?? new NpgsqlConnection(_options.Value.ConnectionString);
if (connection.State == ConnectionState.Closed)
{
diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs
index f6f6b87..afb3296 100644
--- a/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs
+++ b/src/DotNetCore.CAP.PostgreSql/IStorageConnection.PostgreSql.cs
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
+using Microsoft.Extensions.Options;
using Npgsql;
namespace DotNetCore.CAP.PostgreSql
@@ -15,10 +16,12 @@ namespace DotNetCore.CAP.PostgreSql
{
private readonly CapOptions _capOptions;
- public PostgreSqlStorageConnection(PostgreSqlOptions options, CapOptions capOptions)
+ public PostgreSqlStorageConnection(
+ IOptions options,
+ IOptions capOptions)
{
- _capOptions = capOptions;
- Options = options;
+ _capOptions = capOptions.Value;
+ Options = options.Value;
}
public PostgreSqlOptions Options { get; }
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
index d52fe61..f5130a0 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
@@ -6,6 +6,7 @@
using System;
using RabbitMQ.Client;
+// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class RabbitMQOptions
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
index a952313..ac1455a 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbitMQCapOptionsExtension.cs
@@ -20,11 +20,8 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton();
-
- var options = new RabbitMQOptions();
- _configure?.Invoke(options);
- services.AddSingleton(options);
-
+
+ services.Configure(_configure);
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
diff --git a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
index 2bff115..587525a 100644
--- a/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/IConnectionChannelPool.Default.cs
@@ -7,6 +7,7 @@ using System.Diagnostics;
using System.Reflection;
using System.Threading;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace DotNetCore.CAP.RabbitMQ
@@ -18,37 +19,34 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly ILogger _logger;
private readonly ConcurrentQueue _pool;
private IConnection _connection;
- private static readonly object s_lock = new object();
+ private static readonly object SLock = new object();
private int _count;
private int _maxSize;
- public ConnectionChannelPool(ILogger logger,
- CapOptions capOptions,
- RabbitMQOptions options)
+ public ConnectionChannelPool(
+ ILogger logger,
+ IOptions capOptionsAccessor,
+ IOptions optionsAccessor)
{
_logger = logger;
_maxSize = DefaultPoolSize;
_pool = new ConcurrentQueue();
- _connectionActivator = CreateConnection(options);
- HostAddress = options.HostName + ":" + options.Port;
+ var capOptions = capOptionsAccessor.Value;
+ var options = optionsAccessor.Value;
- if (CapOptions.DefaultVersion == capOptions.Version)
- {
- Exchange = options.ExchangeName;
- }
- else
- {
- Exchange = options.ExchangeName + "." + capOptions.Version;
- }
+ _connectionActivator = CreateConnection(options);
+
+ HostAddress = $"{options.HostName}:{options.Port}";
+ Exchange = CapOptions.DefaultVersion == capOptions.Version ? options.ExchangeName : $"{options.ExchangeName}.{capOptions.Version}";
_logger.LogDebug($"RabbitMQ configuration:'HostName:{options.HostName}, Port:{options.Port}, UserName:{options.UserName}, Password:{options.Password}, ExchangeName:{options.ExchangeName}'");
}
IModel IConnectionChannelPool.Rent()
{
- lock (s_lock)
+ lock (SLock)
{
while (_count > _maxSize)
{
@@ -100,14 +98,14 @@ namespace DotNetCore.CAP.RabbitMQ
Password = options.Password,
VirtualHost = options.VirtualHost
};
-
+
if (options.HostName.Contains(","))
{
options.ConnectionFactoryOptions?.Invoke(factory);
return () => factory.CreateConnection(
options.HostName.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries), serviceName);
}
-
+
factory.HostName = options.HostName;
options.ConnectionFactoryOptions?.Invoke(factory);
return () => factory.CreateConnection(serviceName);
@@ -135,7 +133,7 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception e)
{
- _logger.LogError(e,"RabbitMQ channel model create failed!");
+ _logger.LogError(e, "RabbitMQ channel model create failed!");
Console.WriteLine(e);
throw;
}
diff --git a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs
index 6041e5e..ac4f2b9 100644
--- a/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/IPublishMessageSender.RabbitMQ.cs
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
@@ -18,16 +19,21 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly ILogger _logger;
private readonly string _exchange;
- public RabbitMQPublishMessageSender(ILogger logger, CapOptions options,
- IStorageConnection connection, IConnectionChannelPool connectionChannelPool, IStateChanger stateChanger)
+ public RabbitMQPublishMessageSender(
+ ILogger logger,
+ IOptions options,
+ IStorageConnection connection,
+ IConnectionChannelPool connectionChannelPool,
+ IStateChanger stateChanger)
: base(logger, options, connection, stateChanger)
{
_logger = logger;
_connectionChannelPool = connectionChannelPool;
_exchange = _connectionChannelPool.Exchange;
- ServersAddress = _connectionChannelPool.HostAddress;
}
+ protected override string ServersAddress => _connectionChannelPool.HostAddress;
+
public override Task PublishAsync(string keyName, string content)
{
var channel = _connectionChannelPool.Rent();
@@ -48,14 +54,14 @@ namespace DotNetCore.CAP.RabbitMQ
}
catch (Exception ex)
{
- var wapperEx = new PublisherSentFailedException(ex.Message, ex);
+ var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};
- return Task.FromResult(OperateResult.Failed(wapperEx, errors));
+ return Task.FromResult(OperateResult.Failed(wrapperEx, errors));
}
finally
{
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
index 5ce6f8b..953ee06 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
+using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
@@ -12,6 +13,8 @@ namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClient : IConsumerClient
{
+ private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
private readonly IConnectionChannelPool _connectionChannelPool;
private readonly string _exchangeName;
private readonly string _queueName;
@@ -23,14 +26,12 @@ namespace DotNetCore.CAP.RabbitMQ
public RabbitMQConsumerClient(string queueName,
IConnectionChannelPool connectionChannelPool,
- RabbitMQOptions options)
+ IOptions options)
{
_queueName = queueName;
_connectionChannelPool = connectionChannelPool;
- _rabbitMQOptions = options;
+ _rabbitMQOptions = options.Value;
_exchangeName = connectionChannelPool.Exchange;
-
- InitClient();
}
public event EventHandler OnMessageReceived;
@@ -46,6 +47,8 @@ namespace DotNetCore.CAP.RabbitMQ
throw new ArgumentNullException(nameof(topics));
}
+ Connect();
+
foreach (var topic in topics)
{
_channel.QueueBind(_queueName, _exchangeName, topic);
@@ -54,6 +57,8 @@ namespace DotNetCore.CAP.RabbitMQ
public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
+ Connect();
+
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
@@ -88,25 +93,39 @@ namespace DotNetCore.CAP.RabbitMQ
_connection.Dispose();
}
- private void InitClient()
- {
- _connection = _connectionChannelPool.GetConnection();
+ #region events
- _channel = _connection.CreateModel();
+ private void Connect()
+ {
+ if (_connection != null)
+ {
+ return;
+ }
- _channel.ExchangeDeclare(
- _exchangeName,
- RabbitMQOptions.ExchangeType,
- true);
+ _connectionLock.Wait();
- var arguments = new Dictionary
+ try
{
- {"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
- };
- _channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
- }
+ if (_connection == null)
+ {
+ _connection = _connectionChannelPool.GetConnection();
- #region events
+ _channel = _connection.CreateModel();
+
+ _channel.ExchangeDeclare(_exchangeName, RabbitMQOptions.ExchangeType, true);
+
+ var arguments = new Dictionary
+ {
+ {"x-message-ttl", _rabbitMQOptions.QueueMessageExpires}
+ };
+ _channel.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);
+ }
+ }
+ finally
+ {
+ _connectionLock.Release();
+ }
+ }
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
{
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
index f9eb71e..0a9d541 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs
@@ -1,14 +1,16 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using Microsoft.Extensions.Options;
+
namespace DotNetCore.CAP.RabbitMQ
{
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
private readonly IConnectionChannelPool _connectionChannelPool;
- private readonly RabbitMQOptions _rabbitMQOptions;
+ private readonly IOptions _rabbitMQOptions;
- public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions, IConnectionChannelPool channelPool)
+ public RabbitMQConsumerClientFactory(IOptions rabbitMQOptions, IConnectionChannelPool channelPool)
{
_rabbitMQOptions = rabbitMQOptions;
_connectionChannelPool = channelPool;
diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
index 924ca58..3fa9f9b 100644
--- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
+++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs
@@ -5,8 +5,8 @@ using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.SqlServer;
using DotNetCore.CAP.SqlServer.Diagnostics;
-using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -23,42 +23,18 @@ namespace DotNetCore.CAP
public void AddServices(IServiceCollection services)
{
services.AddSingleton();
+
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton(x => (SqlServerPublisher)x.GetService());
+ services.AddSingleton();
- services.AddScoped();
- services.AddScoped();
-
- services.AddTransient();
services.AddTransient();
- AddSqlServerOptions(services);
- }
-
- private void AddSqlServerOptions(IServiceCollection services)
- {
- var sqlServerOptions = new SqlServerOptions();
-
- _configure(sqlServerOptions);
-
- if (sqlServerOptions.DbContextType != null)
- {
- services.AddSingleton(x =>
- {
- using (var scope = x.CreateScope())
- {
- var provider = scope.ServiceProvider;
- var dbContext = (DbContext) provider.GetService(sqlServerOptions.DbContextType);
- sqlServerOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
- return sqlServerOptions;
- }
- });
- }
- else
- {
- services.AddSingleton(sqlServerOptions);
- }
+ services.Configure(_configure);
+ services.AddSingleton, ConfigureSqlServerOptions>();
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs
index b5c1e5f..730e823 100644
--- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs
+++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs
@@ -1,6 +1,11 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
+// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class SqlServerOptions : EFOptions
@@ -10,4 +15,30 @@ namespace DotNetCore.CAP
///
public string ConnectionString { get; set; }
}
+
+
+ internal class ConfigureSqlServerOptions : IConfigureOptions
+ {
+ private readonly IServiceScopeFactory _serviceScopeFactory;
+
+ public ConfigureSqlServerOptions(IServiceScopeFactory serviceScopeFactory)
+ {
+ _serviceScopeFactory = serviceScopeFactory;
+ }
+
+ public void Configure(SqlServerOptions options)
+ {
+ if (options.DbContextType != null)
+ {
+ using (var scope = _serviceScopeFactory.CreateScope())
+ {
+ var provider = scope.ServiceProvider;
+ using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
+ {
+ options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs
index 19de84f..d759edb 100644
--- a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs
+++ b/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs
@@ -11,6 +11,7 @@ using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
@@ -20,7 +21,7 @@ namespace DotNetCore.CAP.SqlServer
public SqlServerPublisher(IServiceProvider provider) : base(provider)
{
- _options = ServiceProvider.GetService();
+ _options = ServiceProvider.GetService>().Value;
}
public async Task PublishCallbackAsync(CapPublishedMessage message)
diff --git a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs
index d06f33a..76badfc 100644
--- a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs
+++ b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs
@@ -12,6 +12,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
@@ -23,9 +24,9 @@ namespace DotNetCore.CAP
public SqlServerCapTransaction(
IDispatcher dispatcher,
- SqlServerOptions sqlServerOptions,
IServiceProvider serviceProvider) : base(dispatcher)
{
+ var sqlServerOptions = serviceProvider.GetService>().Value;
if (sqlServerOptions.DbContextType != null)
{
_dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext;
@@ -56,14 +57,14 @@ namespace DotNetCore.CAP
}
}
- var transactionKey = ((SqlConnection) dbTransaction.Connection).ClientConnectionId;
+ var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId;
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list))
{
list.Add(msg);
}
else
{
- var msgList = new List(1) {msg};
+ var msgList = new List(1) { msg };
_diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList);
}
}
@@ -109,6 +110,7 @@ namespace DotNetCore.CAP
dbContextTransaction.Dispose();
break;
}
+ DbTransaction = null;
}
}
@@ -149,7 +151,7 @@ namespace DotNetCore.CAP
var dbTransaction = dbConnection.BeginTransaction();
var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit);
- return (IDbTransaction) capTransaction.DbTransaction;
+ return (IDbTransaction)capTransaction.DbTransaction;
}
///
diff --git a/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
index c7a4d03..c734e73 100644
--- a/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
+++ b/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
@@ -25,10 +26,10 @@ namespace DotNetCore.CAP.SqlServer
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
public SqlServerCollectProcessor(ILogger logger,
- SqlServerOptions sqlServerOptions)
+ IOptions sqlServerOptions)
{
_logger = logger;
- _options = sqlServerOptions;
+ _options = sqlServerOptions.Value;
}
public async Task ProcessAsync(ProcessingContext context)
diff --git a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs
index 435301a..4804406 100644
--- a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs
+++ b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs
@@ -11,6 +11,7 @@ using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.SqlServer.Diagnostics;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.SqlServer
{
@@ -23,14 +24,14 @@ namespace DotNetCore.CAP.SqlServer
private readonly SqlServerOptions _options;
public SqlServerStorage(ILogger logger,
- CapOptions capOptions,
- SqlServerOptions options,
+ IOptions capOptions,
+ IOptions options,
DiagnosticProcessorObserver diagnosticProcessorObserver)
{
- _options = options;
+ _options = options.Value;
_diagnosticProcessorObserver = diagnosticProcessorObserver;
_logger = logger;
- _capOptions = capOptions;
+ _capOptions = capOptions.Value;
}
public IStorageConnection GetConnection()
diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
index 5e1b236..0cf4250 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
@@ -15,9 +15,9 @@ namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
{
- private readonly CapTransactionBase _transaction;
private readonly IMessagePacker _msgPacker;
private readonly IContentSerializer _serializer;
+ private CapTransactionBase _transaction;
protected bool NotUseTransaction;
@@ -28,14 +28,27 @@ namespace DotNetCore.CAP.Abstractions
protected CapPublisherBase(IServiceProvider service)
{
ServiceProvider = service;
- _transaction = service.GetRequiredService();
_msgPacker = service.GetRequiredService();
_serializer = service.GetRequiredService();
}
protected IServiceProvider ServiceProvider { get; }
- public ICapTransaction Transaction => _transaction;
+ public ICapTransaction Transaction
+ {
+ get
+ {
+ if (_transaction == null)
+ {
+ using (var scope = ServiceProvider.CreateScope())
+ {
+ _transaction = scope.ServiceProvider.GetRequiredService();
+ }
+ }
+
+ return _transaction;
+ }
+ }
public void Publish(string name, T contentObj, string callbackName = null)
{
@@ -99,7 +112,7 @@ namespace DotNetCore.CAP.Abstractions
{
if (NotUseTransaction || Transaction.AutoCommit)
{
- _transaction.Dispose();
+ _transaction?.Dispose();
}
}
}
diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
index a08cfad..c28743d 100644
--- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
+++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
-using System.Collections.Generic;
using DotNetCore.CAP;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Internal;
@@ -11,7 +10,6 @@ using DotNetCore.CAP.Processor.States;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection.Extensions;
-using Microsoft.Extensions.Hosting;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
@@ -58,7 +56,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton();
//Queue's message processor
- services.TryAddSingleton();
+ services.TryAddSingleton();
services.TryAddSingleton();
//Sender and Executors
@@ -73,11 +71,11 @@ namespace Microsoft.Extensions.DependencyInjection
{
serviceExtension.AddServices(services);
}
- services.AddSingleton(options);
+ services.Configure(setupAction);
- //Startup and Middleware
- services.AddTransient();
+ //Startup and Hosted
services.AddTransient();
+ services.AddHostedService();
return new CapBuilder(services);
}
diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
index eea68de..892d42d 100644
--- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj
+++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
@@ -33,11 +33,9 @@
-
-
+
-
diff --git a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
index ae0507a..e64a9b7 100644
--- a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
@@ -1,222 +1,223 @@
-// Copyright (c) .NET Core Community. All rights reserved.
-// Licensed under the MIT License. See License.txt in the project root for license information.
-
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Reflection;
-using System.Text.RegularExpressions;
-using DotNetCore.CAP.Abstractions;
-using DotNetCore.CAP.Infrastructure;
-using Microsoft.Extensions.DependencyInjection;
-using System.Collections.Concurrent;
-
-namespace DotNetCore.CAP
-{
- ///
- ///
- /// A default implementation.
- ///
- public class DefaultConsumerServiceSelector : IConsumerServiceSelector
- {
- private readonly CapOptions _capOptions;
- private readonly IServiceProvider _serviceProvider;
-
- ///
- /// since this class be designed as a Singleton service,the following two list must be thread safe!
- ///
- private readonly ConcurrentDictionary>> _asteriskList;
- private readonly ConcurrentDictionary>> _poundList;
-
- ///
- /// Creates a new .
- ///
- public DefaultConsumerServiceSelector(IServiceProvider serviceProvider, CapOptions capOptions)
- {
- _serviceProvider = serviceProvider;
- _capOptions = capOptions;
-
- _asteriskList = new ConcurrentDictionary>>();
- _poundList = new ConcurrentDictionary>>();
- }
-
- public IReadOnlyList SelectCandidates()
- {
- var executorDescriptorList = new List();
-
- executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider));
-
- executorDescriptorList.AddRange(FindConsumersFromControllerTypes());
-
- return executorDescriptorList;
- }
-
- public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList executeDescriptor)
- {
- var result = MatchUsingName(key, executeDescriptor);
- if (result != null)
- {
- return result;
- }
-
- //[*] match with regex, i.e. foo.*.abc
- result = MatchAsteriskUsingRegex(key, executeDescriptor);
- if (result != null)
- {
- return result;
- }
-
- //[#] match regex, i.e. foo.#
- result = MatchPoundUsingRegex(key, executeDescriptor);
- return result;
- }
-
- protected virtual IEnumerable FindConsumersFromInterfaceTypes(
- IServiceProvider provider)
- {
- var executorDescriptorList = new List();
-
- var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo();
-
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text.RegularExpressions;
+using DotNetCore.CAP.Abstractions;
+using DotNetCore.CAP.Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using System.Collections.Concurrent;
+using Microsoft.Extensions.Options;
+
+namespace DotNetCore.CAP
+{
+ ///
+ ///
+ /// A default implementation.
+ ///
+ public class DefaultConsumerServiceSelector : IConsumerServiceSelector
+ {
+ private readonly CapOptions _capOptions;
+ private readonly IServiceProvider _serviceProvider;
+
+ ///
+ /// since this class be designed as a Singleton service,the following two list must be thread safe!
+ ///
+ private readonly ConcurrentDictionary>> _asteriskList;
+ private readonly ConcurrentDictionary>> _poundList;
+
+ ///
+ /// Creates a new .
+ ///
+ public DefaultConsumerServiceSelector(IServiceProvider serviceProvider)
+ {
+ _serviceProvider = serviceProvider;
+ _capOptions = serviceProvider.GetService>().Value;
+
+ _asteriskList = new ConcurrentDictionary>>();
+ _poundList = new ConcurrentDictionary>>();
+ }
+
+ public IReadOnlyList SelectCandidates()
+ {
+ var executorDescriptorList = new List();
+
+ executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider));
+
+ executorDescriptorList.AddRange(FindConsumersFromControllerTypes());
+
+ return executorDescriptorList;
+ }
+
+ public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList executeDescriptor)
+ {
+ var result = MatchUsingName(key, executeDescriptor);
+ if (result != null)
+ {
+ return result;
+ }
+
+ //[*] match with regex, i.e. foo.*.abc
+ result = MatchAsteriskUsingRegex(key, executeDescriptor);
+ if (result != null)
+ {
+ return result;
+ }
+
+ //[#] match regex, i.e. foo.#
+ result = MatchPoundUsingRegex(key, executeDescriptor);
+ return result;
+ }
+
+ protected virtual IEnumerable FindConsumersFromInterfaceTypes(
+ IServiceProvider provider)
+ {
+ var executorDescriptorList = new List();
+
+ var capSubscribeTypeInfo = typeof(ICapSubscribe).GetTypeInfo();
+
foreach (var service in ServiceCollectionExtensions.ServiceCollection.Where(o => o.ImplementationType != null && o.ServiceType != null))
{
var typeInfo = service.ImplementationType.GetTypeInfo();
if (!capSubscribeTypeInfo.IsAssignableFrom(typeInfo))
{
continue;
- }
+ }
var serviceTypeInfo = service.ServiceType.GetTypeInfo();
executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo, serviceTypeInfo));
}
- return executorDescriptorList;
- }
-
- protected virtual IEnumerable FindConsumersFromControllerTypes()
- {
- var executorDescriptorList = new List();
-
- var types = Assembly.GetEntryAssembly().ExportedTypes;
- foreach (var type in types)
- {
- var typeInfo = type.GetTypeInfo();
- if (Helper.IsController(typeInfo))
- {
- executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
- }
- }
-
- return executorDescriptorList;
- }
-
- protected IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null)
- {
- foreach (var method in typeInfo.DeclaredMethods)
- {
- var topicAttr = method.GetCustomAttributes(true);
- var topicAttributes = topicAttr as IList ?? topicAttr.ToList();
-
- if (!topicAttributes.Any())
- {
- continue;
- }
-
- foreach (var attr in topicAttributes)
- {
- if (attr.Group == null)
- {
- attr.Group = _capOptions.DefaultGroup + "." + _capOptions.Version;
- }
- else
- {
- attr.Group = attr.Group + "." + _capOptions.Version;
- }
-
- yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo);
- }
- }
- }
-
- private static ConsumerExecutorDescriptor InitDescriptor(
- TopicAttribute attr,
- MethodInfo methodInfo,
+ return executorDescriptorList;
+ }
+
+ protected virtual IEnumerable FindConsumersFromControllerTypes()
+ {
+ var executorDescriptorList = new List();
+
+ var types = Assembly.GetEntryAssembly().ExportedTypes;
+ foreach (var type in types)
+ {
+ var typeInfo = type.GetTypeInfo();
+ if (Helper.IsController(typeInfo))
+ {
+ executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
+ }
+ }
+
+ return executorDescriptorList;
+ }
+
+ protected IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null)
+ {
+ foreach (var method in typeInfo.DeclaredMethods)
+ {
+ var topicAttr = method.GetCustomAttributes(true);
+ var topicAttributes = topicAttr as IList ?? topicAttr.ToList();
+
+ if (!topicAttributes.Any())
+ {
+ continue;
+ }
+
+ foreach (var attr in topicAttributes)
+ {
+ if (attr.Group == null)
+ {
+ attr.Group = _capOptions.DefaultGroup + "." + _capOptions.Version;
+ }
+ else
+ {
+ attr.Group = attr.Group + "." + _capOptions.Version;
+ }
+
+ yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo);
+ }
+ }
+ }
+
+ private static ConsumerExecutorDescriptor InitDescriptor(
+ TopicAttribute attr,
+ MethodInfo methodInfo,
TypeInfo implType,
- TypeInfo serviceTypeInfo)
- {
- var descriptor = new ConsumerExecutorDescriptor
- {
- Attribute = attr,
- MethodInfo = methodInfo,
+ TypeInfo serviceTypeInfo)
+ {
+ var descriptor = new ConsumerExecutorDescriptor
+ {
+ Attribute = attr,
+ MethodInfo = methodInfo,
ImplTypeInfo = implType,
- ServiceTypeInfo = serviceTypeInfo
- };
-
- return descriptor;
- }
-
- private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList executeDescriptor)
- {
- return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
- }
-
- private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList executeDescriptor)
- {
- var group = executeDescriptor.First().Attribute.Group;
- if (!_asteriskList.TryGetValue(group, out var tmpList))
- {
- tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
- .Select(x => new RegexExecuteDescriptor
- {
- Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."),
- Descriptor = x
- }).ToList();
- _asteriskList.TryAdd(group, tmpList);
- }
-
- foreach (var red in tmpList)
- {
- if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
- {
- return red.Descriptor;
- }
- }
-
- return null;
- }
-
- private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList executeDescriptor)
- {
- var group = executeDescriptor.First().Attribute.Group;
- if (!_poundList.TryGetValue(group, out var tmpList))
- {
- tmpList = executeDescriptor
- .Where(x => x.Attribute.Name.IndexOf('#') >= 0)
- .Select(x => new RegexExecuteDescriptor
- {
- Name = ("^" + x.Attribute.Name + "$").Replace("#", "[0-9_a-zA-Z\\.]+"),
- Descriptor = x
- }).ToList();
- _poundList.TryAdd(group, tmpList);
- }
-
- foreach (var red in tmpList)
- {
- if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
- {
- return red.Descriptor;
- }
- }
-
- return null;
- }
-
-
- private class RegexExecuteDescriptor
- {
- public string Name { get; set; }
-
- public T Descriptor { get; set; }
- }
- }
+ ServiceTypeInfo = serviceTypeInfo
+ };
+
+ return descriptor;
+ }
+
+ private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList executeDescriptor)
+ {
+ return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
+ }
+
+ private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList executeDescriptor)
+ {
+ var group = executeDescriptor.First().Attribute.Group;
+ if (!_asteriskList.TryGetValue(group, out var tmpList))
+ {
+ tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
+ .Select(x => new RegexExecuteDescriptor
+ {
+ Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\\."),
+ Descriptor = x
+ }).ToList();
+ _asteriskList.TryAdd(group, tmpList);
+ }
+
+ foreach (var red in tmpList)
+ {
+ if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
+ {
+ return red.Descriptor;
+ }
+ }
+
+ return null;
+ }
+
+ private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList executeDescriptor)
+ {
+ var group = executeDescriptor.First().Attribute.Group;
+ if (!_poundList.TryGetValue(group, out var tmpList))
+ {
+ tmpList = executeDescriptor
+ .Where(x => x.Attribute.Name.IndexOf('#') >= 0)
+ .Select(x => new RegexExecuteDescriptor
+ {
+ Name = ("^" + x.Attribute.Name + "$").Replace("#", "[0-9_a-zA-Z\\.]+"),
+ Descriptor = x
+ }).ToList();
+ _poundList.TryAdd(group, tmpList);
+ }
+
+ foreach (var red in tmpList)
+ {
+ if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
+ {
+ return red.Descriptor;
+ }
+ }
+
+ return null;
+ }
+
+
+ private class RegexExecuteDescriptor
+ {
+ public string Name { get; set; }
+
+ public T Descriptor { get; set; }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
index 45292ab..873929f 100644
--- a/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
+++ b/src/DotNetCore.CAP/IPublishMessageSender.Base.cs
@@ -11,6 +11,7 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
@@ -21,7 +22,7 @@ namespace DotNetCore.CAP
private readonly CapOptions _options;
private readonly IStateChanger _stateChanger;
- protected string ServersAddress { get; set; }
+ protected abstract string ServersAddress { get; }
// diagnostics listener
// ReSharper disable once InconsistentNaming
@@ -30,11 +31,11 @@ namespace DotNetCore.CAP
protected BasePublishMessageSender(
ILogger logger,
- CapOptions options,
+ IOptions options,
IStorageConnection connection,
IStateChanger stateChanger)
{
- _options = options;
+ _options = options.Value;
_connection = connection;
_stateChanger = stateChanger;
_logger = logger;
diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
index 72e27c2..81b6576 100644
--- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
+++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
@@ -11,6 +11,7 @@ using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP
{
@@ -30,7 +31,7 @@ namespace DotNetCore.CAP
public DefaultSubscriberExecutor(
ILogger logger,
- CapOptions options,
+ IOptions options,
IConsumerInvokerFactory consumerInvokerFactory,
ICallbackMessageSender callbackMessageSender,
IStateChanger stateChanger,
@@ -39,7 +40,7 @@ namespace DotNetCore.CAP
{
_selector = selector;
_callbackMessageSender = callbackMessageSender;
- _options = options;
+ _options = options.Value;
_stateChanger = stateChanger;
_connection = connection;
_logger = logger;
diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
index e7fb78b..bb29d2b 100644
--- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
+++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
@@ -94,7 +94,7 @@ namespace DotNetCore.CAP.Processor
var returnedProcessors = new List
{
_provider.GetRequiredService(),
- _provider.GetRequiredService(),
+ _provider.GetRequiredService(),
_provider.GetRequiredService()
};
diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
index ee1dfdf..66c1b39 100644
--- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
+++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
@@ -7,27 +7,28 @@ using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
namespace DotNetCore.CAP.Processor
{
- public class NeedRetryMessageProcessor : IProcessor
+ public class MessageNeedToRetryProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
- private readonly ILogger _logger;
+ private readonly ILogger