@@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection; | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
public class KafkaCapOptionsExtension : ICapOptionsExtension | |||||
internal sealed class KafkaCapOptionsExtension : ICapOptionsExtension | |||||
{ | { | ||||
private readonly Action<KafkaOptions> _configure; | private readonly Action<KafkaOptions> _configure; | ||||
@@ -16,10 +16,8 @@ namespace DotNetCore.CAP | |||||
public void AddServices(IServiceCollection services) | public void AddServices(IServiceCollection services) | ||||
{ | { | ||||
services.Configure(_configure); | |||||
var kafkaOptions = new KafkaOptions(); | var kafkaOptions = new KafkaOptions(); | ||||
_configure(kafkaOptions); | |||||
_configure?.Invoke(kafkaOptions); | |||||
services.AddSingleton(kafkaOptions); | services.AddSingleton(kafkaOptions); | ||||
services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); | services.AddSingleton<IConsumerClientFactory, KafkaConsumerClientFactory>(); | ||||
@@ -6,6 +6,10 @@ namespace Microsoft.Extensions.DependencyInjection | |||||
{ | { | ||||
public static class CapOptionsExtensions | public static class CapOptionsExtensions | ||||
{ | { | ||||
/// <summary> | |||||
/// Configuration to use kafka in CAP. | |||||
/// </summary> | |||||
/// <param name="bootstrapServers">Kafka bootstrap server urls.</param> | |||||
public static CapOptions UseKafka(this CapOptions options, string bootstrapServers) | public static CapOptions UseKafka(this CapOptions options, string bootstrapServers) | ||||
{ | { | ||||
return options.UseKafka(opt => | return options.UseKafka(opt => | ||||
@@ -14,6 +18,11 @@ namespace Microsoft.Extensions.DependencyInjection | |||||
}); | }); | ||||
} | } | ||||
/// <summary> | |||||
/// Configuration to use kafka in CAP. | |||||
/// </summary> | |||||
/// <param name="configure">Provides programmatic configuration for the kafka .</param> | |||||
/// <returns></returns> | |||||
public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure) | public static CapOptions UseKafka(this CapOptions options, Action<KafkaOptions> configure) | ||||
{ | { | ||||
if (configure == null) throw new ArgumentNullException(nameof(configure)); | if (configure == null) throw new ArgumentNullException(nameof(configure)); | ||||
@@ -3,6 +3,9 @@ | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
/// <summary> | |||||
/// An attribute for subscribe Kafka messages. | |||||
/// </summary> | |||||
public class CapSubscribeAttribute : TopicAttribute | public class CapSubscribeAttribute : TopicAttribute | ||||
{ | { | ||||
public CapSubscribeAttribute(string name) | public CapSubscribeAttribute(string name) | ||||
@@ -6,7 +6,7 @@ using Confluent.Kafka.Serialization; | |||||
namespace DotNetCore.CAP.Kafka | namespace DotNetCore.CAP.Kafka | ||||
{ | { | ||||
public class KafkaConsumerClient : IConsumerClient | |||||
internal sealed class KafkaConsumerClient : IConsumerClient | |||||
{ | { | ||||
private readonly string _groupId; | private readonly string _groupId; | ||||
private readonly KafkaOptions _kafkaOptions; | private readonly KafkaOptions _kafkaOptions; | ||||
@@ -3,13 +3,13 @@ using Microsoft.Extensions.Options; | |||||
namespace DotNetCore.CAP.Kafka | namespace DotNetCore.CAP.Kafka | ||||
{ | { | ||||
public class KafkaConsumerClientFactory : IConsumerClientFactory | |||||
internal sealed class KafkaConsumerClientFactory : IConsumerClientFactory | |||||
{ | { | ||||
private readonly KafkaOptions _kafkaOptions; | private readonly KafkaOptions _kafkaOptions; | ||||
public KafkaConsumerClientFactory(IOptions<KafkaOptions> kafkaOptions) | |||||
public KafkaConsumerClientFactory(KafkaOptions kafkaOptions) | |||||
{ | { | ||||
_kafkaOptions = kafkaOptions?.Value ?? throw new ArgumentNullException(nameof(kafkaOptions)); | |||||
_kafkaOptions = kafkaOptions; | |||||
} | } | ||||
public IConsumerClient Create(string groupId) | public IConsumerClient Create(string groupId) | ||||
@@ -4,22 +4,21 @@ using System.Threading.Tasks; | |||||
using Confluent.Kafka; | using Confluent.Kafka; | ||||
using DotNetCore.CAP.Processor.States; | using DotNetCore.CAP.Processor.States; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using Microsoft.Extensions.Options; | |||||
namespace DotNetCore.CAP.Kafka | namespace DotNetCore.CAP.Kafka | ||||
{ | { | ||||
public class PublishQueueExecutor : BasePublishQueueExecutor | |||||
internal class PublishQueueExecutor : BasePublishQueueExecutor | |||||
{ | { | ||||
private readonly ILogger _logger; | private readonly ILogger _logger; | ||||
private readonly KafkaOptions _kafkaOptions; | private readonly KafkaOptions _kafkaOptions; | ||||
public PublishQueueExecutor(IStateChanger stateChanger, | public PublishQueueExecutor(IStateChanger stateChanger, | ||||
IOptions<KafkaOptions> options, | |||||
KafkaOptions options, | |||||
ILogger<PublishQueueExecutor> logger) | ILogger<PublishQueueExecutor> logger) | ||||
: base(stateChanger, logger) | : base(stateChanger, logger) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_kafkaOptions = options.Value; | |||||
_kafkaOptions = options; | |||||
} | } | ||||
public override Task<OperateResult> PublishAsync(string keyName, string content) | public override Task<OperateResult> PublishAsync(string keyName, string content) | ||||
@@ -7,7 +7,7 @@ using Microsoft.Extensions.DependencyInjection; | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
public class MySqlCapOptionsExtension : ICapOptionsExtension | |||||
internal class MySqlCapOptionsExtension : ICapOptionsExtension | |||||
{ | { | ||||
private readonly Action<MySqlOptions> _configure; | private readonly Action<MySqlOptions> _configure; | ||||
@@ -2,7 +2,7 @@ | |||||
namespace DotNetCore.CAP.MySql | namespace DotNetCore.CAP.MySql | ||||
{ | { | ||||
public class FetchedMessage | |||||
internal class FetchedMessage | |||||
{ | { | ||||
public int MessageId { get; set; } | public int MessageId { get; set; } | ||||
@@ -7,7 +7,7 @@ using MySql.Data.MySqlClient; | |||||
namespace DotNetCore.CAP.MySql | namespace DotNetCore.CAP.MySql | ||||
{ | { | ||||
public class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
internal class DefaultAdditionalProcessor : IAdditionalProcessor | |||||
{ | { | ||||
private readonly IServiceProvider _provider; | private readonly IServiceProvider _provider; | ||||
private readonly ILogger _logger; | private readonly ILogger _logger; | ||||
@@ -4,7 +4,7 @@ using DotNetCore.CAP; | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace Microsoft.Extensions.DependencyInjection | namespace Microsoft.Extensions.DependencyInjection | ||||
{ | { | ||||
public static class CapOptionsExtensions | |||||
internal static class CapOptionsExtensions | |||||
{ | { | ||||
public static CapOptions UseRabbitMQ(this CapOptions options, string hostName) | public static CapOptions UseRabbitMQ(this CapOptions options, string hostName) | ||||
{ | { | ||||
@@ -5,7 +5,7 @@ using Microsoft.Extensions.DependencyInjection; | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
public class RabbitMQCapOptionsExtension : ICapOptionsExtension | |||||
internal sealed class RabbitMQCapOptionsExtension : ICapOptionsExtension | |||||
{ | { | ||||
private readonly Action<RabbitMQOptions> _configure; | private readonly Action<RabbitMQOptions> _configure; | ||||
@@ -16,12 +16,9 @@ namespace DotNetCore.CAP | |||||
public void AddServices(IServiceCollection services) | public void AddServices(IServiceCollection services) | ||||
{ | { | ||||
services.Configure(_configure); | |||||
var rabbitMQOptions = new RabbitMQOptions(); | |||||
_configure(rabbitMQOptions); | |||||
services.AddSingleton(rabbitMQOptions); | |||||
var options = new RabbitMQOptions(); | |||||
_configure?.Invoke(options); | |||||
services.AddSingleton(options); | |||||
services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); | services.AddSingleton<IConsumerClientFactory, RabbitMQConsumerClientFactory>(); | ||||
services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | services.AddTransient<IQueueExecutor, PublishQueueExecutor>(); | ||||
@@ -3,6 +3,9 @@ | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
/// <summary> | |||||
/// An attribute for subscribe RabbitMQ messages. | |||||
/// </summary> | |||||
public class CapSubscribeAttribute : TopicAttribute | public class CapSubscribeAttribute : TopicAttribute | ||||
{ | { | ||||
public CapSubscribeAttribute(string name) : base(name) | public CapSubscribeAttribute(string name) : base(name) | ||||
@@ -8,18 +8,18 @@ using RabbitMQ.Client; | |||||
namespace DotNetCore.CAP.RabbitMQ | namespace DotNetCore.CAP.RabbitMQ | ||||
{ | { | ||||
public class PublishQueueExecutor : BasePublishQueueExecutor | |||||
internal sealed class PublishQueueExecutor : BasePublishQueueExecutor | |||||
{ | { | ||||
private readonly ILogger _logger; | private readonly ILogger _logger; | ||||
private readonly RabbitMQOptions _rabbitMQOptions; | private readonly RabbitMQOptions _rabbitMQOptions; | ||||
public PublishQueueExecutor(IStateChanger stateChanger, | public PublishQueueExecutor(IStateChanger stateChanger, | ||||
IOptions<RabbitMQOptions> options, | |||||
RabbitMQOptions options, | |||||
ILogger<PublishQueueExecutor> logger) | ILogger<PublishQueueExecutor> logger) | ||||
: base(stateChanger, logger) | : base(stateChanger, logger) | ||||
{ | { | ||||
_logger = logger; | _logger = logger; | ||||
_rabbitMQOptions = options.Value; | |||||
_rabbitMQOptions = options; | |||||
} | } | ||||
public override Task<OperateResult> PublishAsync(string keyName, string content) | public override Task<OperateResult> PublishAsync(string keyName, string content) | ||||
@@ -7,7 +7,7 @@ using RabbitMQ.Client.Events; | |||||
namespace DotNetCore.CAP.RabbitMQ | namespace DotNetCore.CAP.RabbitMQ | ||||
{ | { | ||||
public class RabbitMQConsumerClient : IConsumerClient | |||||
internal sealed class RabbitMQConsumerClient : IConsumerClient | |||||
{ | { | ||||
private readonly string _exchageName; | private readonly string _exchageName; | ||||
private readonly string _queueName; | private readonly string _queueName; | ||||
@@ -2,13 +2,13 @@ | |||||
namespace DotNetCore.CAP.RabbitMQ | namespace DotNetCore.CAP.RabbitMQ | ||||
{ | { | ||||
public class RabbitMQConsumerClientFactory : IConsumerClientFactory | |||||
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory | |||||
{ | { | ||||
private readonly RabbitMQOptions _rabbitMQOptions; | private readonly RabbitMQOptions _rabbitMQOptions; | ||||
public RabbitMQConsumerClientFactory(IOptions<RabbitMQOptions> rabbitMQOptions) | |||||
public RabbitMQConsumerClientFactory(RabbitMQOptions rabbitMQOptions) | |||||
{ | { | ||||
_rabbitMQOptions = rabbitMQOptions.Value; | |||||
_rabbitMQOptions = rabbitMQOptions; | |||||
} | } | ||||
public IConsumerClient Create(string groupId) | public IConsumerClient Create(string groupId) | ||||
@@ -7,7 +7,7 @@ using Microsoft.Extensions.DependencyInjection; | |||||
// ReSharper disable once CheckNamespace | // ReSharper disable once CheckNamespace | ||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
public class SqlServerCapOptionsExtension : ICapOptionsExtension | |||||
internal class SqlServerCapOptionsExtension : ICapOptionsExtension | |||||
{ | { | ||||
private readonly Action<SqlServerOptions> _configure; | private readonly Action<SqlServerOptions> _configure; | ||||
@@ -2,7 +2,7 @@ | |||||
namespace DotNetCore.CAP.SqlServer | namespace DotNetCore.CAP.SqlServer | ||||
{ | { | ||||
public class FetchedMessage | |||||
internal class FetchedMessage | |||||
{ | { | ||||
public int MessageId { get; set; } | public int MessageId { get; set; } | ||||