From b2a0420e86d5a2d67a80906d41aad58502a7be33 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 16 Feb 2019 20:56:49 +0800 Subject: [PATCH] Features: Add Azure Service Bus Support (#287) Add Azure Service Bus supported --- CAP.sln | 14 ++ .../Controllers/ValuesController.cs | 70 +++++++ .../Sample.AzureServiceBus.MySql/Program.cs | 19 ++ .../Sample.AzureServiceBus.MySql.csproj | 19 ++ .../Sample.AzureServiceBus.MySql/Startup.cs | 25 +++ .../appsettings.json | 8 + .../AzureServiceBusConsumerClient.cs | 174 ++++++++++++++++++ .../AzureServiceBusConsumerClientFactory.cs | 27 +++ .../CAP.AzureServiceBusOptions.cs | 40 ++++ .../CAP.AzureServiceBusOptionsExtension.cs | 33 ++++ .../CAP.Options.Extensions.cs | 44 +++++ .../CAP.SubscribeAttribute.cs | 25 +++ .../DotNetCore.CAP.AzureServiceBus.csproj | 23 +++ .../IPublishMessageSender.AzureServiceBus.cs | 63 +++++++ .../IConsumerHandler.Default.cs | 5 +- src/DotNetCore.CAP/MessageContext.cs | 4 +- src/DotNetCore.CAP/MqLogType.cs | 5 +- 17 files changed, 595 insertions(+), 3 deletions(-) create mode 100644 samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs create mode 100644 samples/Sample.AzureServiceBus.MySql/Program.cs create mode 100644 samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj create mode 100644 samples/Sample.AzureServiceBus.MySql/Startup.cs create mode 100644 samples/Sample.AzureServiceBus.MySql/appsettings.json create mode 100644 src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/CAP.Options.Extensions.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/CAP.SubscribeAttribute.cs create mode 100644 src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj create mode 100644 src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs diff --git a/CAP.sln b/CAP.sln index 93f9a15..c9c05b5 100644 --- a/CAP.sln +++ b/CAP.sln @@ -62,6 +62,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MongoDB", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.MySql", "samples\Sample.Kafka.MySql\Sample.Kafka.MySql.csproj", "{11563D1A-27CC-45CF-8C04-C16BCC21250A}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AzureServiceBus", "src\DotNetCore.CAP.AzureServiceBus\DotNetCore.CAP.AzureServiceBus.csproj", "{63B2A464-FBEA-42FB-8EFA-98AFA39FC920}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AzureServiceBus.MySql", "samples\Sample.AzureServiceBus.MySql\Sample.AzureServiceBus.MySql.csproj", "{364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -124,6 +128,14 @@ Global {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.ActiveCfg = Release|Any CPU {11563D1A-27CC-45CF-8C04-C16BCC21250A}.Release|Any CPU.Build.0 = Release|Any CPU + {63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Debug|Any CPU.Build.0 = Debug|Any CPU + {63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Release|Any CPU.ActiveCfg = Release|Any CPU + {63B2A464-FBEA-42FB-8EFA-98AFA39FC920}.Release|Any CPU.Build.0 = Release|Any CPU + {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -144,6 +156,8 @@ Global {77C0AC02-C44B-49D5-B969-7D5305FC20A5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {4473DE19-E8D2-4B57-80A8-C8AAA2BFA20F} = {3A6B6931-A123-477A-9469-8B468B5385AF} {11563D1A-27CC-45CF-8C04-C16BCC21250A} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {63B2A464-FBEA-42FB-8EFA-98AFA39FC920} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {364A72B0-3AD2-4BC4-8D22-5A0484E2A08B} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs b/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs new file mode 100644 index 0000000..bd3f6d7 --- /dev/null +++ b/samples/Sample.AzureServiceBus.MySql/Controllers/ValuesController.cs @@ -0,0 +1,70 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP; +using Microsoft.AspNetCore.Mvc; +using MySql.Data.MySqlClient; + +namespace Sample.AzureServiceBus.MySql.Controllers +{ + [Route("api/[controller]")] + public class ValuesController : Controller, ICapSubscribe + { + private readonly ICapPublisher _capBus; + + public ValuesController(ICapPublisher producer) + { + _capBus = producer; + } + + [Route("~/without/transaction")] + public async Task WithoutTransaction() + { + await _capBus.PublishAsync("sample.azure.mysql", DateTime.Now); + + return Ok(); + } + + [Route("~/adonet/transaction")] + public IActionResult AdonetWithTransaction() + { + using (var connection = new MySqlConnection("")) + { + using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false)) + { + //your business code + connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); + + for (int i = 0; i < 5; i++) + { + _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); + } + + transaction.Commit(); + } + } + + return Ok(); + } + + + [CapSubscribe("sample.azure.mysql")] + public void Test2(DateTime value) + { + Console.WriteLine("Subscriber output message: " + value); + } + + [CapSubscribe("sample.azure.mysql")] + public void Test2T2(DateTime value) + { + Console.WriteLine("Test2T2-->Subscriber output message: " + value); + } + + [CapSubscribe("sample.azure.mysql",Group = "groupd")] + public void Test2Group(DateTime value) + { + Console.WriteLine("Group--> Subscriber output message: " + value); + } + } +} \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.MySql/Program.cs b/samples/Sample.AzureServiceBus.MySql/Program.cs new file mode 100644 index 0000000..2be0f27 --- /dev/null +++ b/samples/Sample.AzureServiceBus.MySql/Program.cs @@ -0,0 +1,19 @@ +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; + +namespace Sample.AzureServiceBus.MySql +{ + public class Program + { + + public static void Main(string[] args) + { + BuildWebHost(args).Run(); + } + + public static IWebHost BuildWebHost(string[] args) => + WebHost.CreateDefaultBuilder(args) + .UseStartup() + .Build(); + } +} \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj b/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj new file mode 100644 index 0000000..60cc292 --- /dev/null +++ b/samples/Sample.AzureServiceBus.MySql/Sample.AzureServiceBus.MySql.csproj @@ -0,0 +1,19 @@ + + + + netcoreapp2.2 + Sample.Kafka.MySql + NU1701 + NU1701 + + + + + + + + + + + + diff --git a/samples/Sample.AzureServiceBus.MySql/Startup.cs b/samples/Sample.AzureServiceBus.MySql/Startup.cs new file mode 100644 index 0000000..d2bbc35 --- /dev/null +++ b/samples/Sample.AzureServiceBus.MySql/Startup.cs @@ -0,0 +1,25 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; + +namespace Sample.AzureServiceBus.MySql +{ + public class Startup + { + public void ConfigureServices(IServiceCollection services) + { + services.AddCap(x => + { + x.UseMySql("Server=localhost;Database=testcap;UserId=root;Password=123123;"); + x.UseAzureServiceBus("Endpoint=sb://testcap.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="); + x.UseDashboard(); + }); + + services.AddMvc(); + } + + public void Configure(IApplicationBuilder app) + { + app.UseMvc(); + } + } +} \ No newline at end of file diff --git a/samples/Sample.AzureServiceBus.MySql/appsettings.json b/samples/Sample.AzureServiceBus.MySql/appsettings.json new file mode 100644 index 0000000..20aa907 --- /dev/null +++ b/samples/Sample.AzureServiceBus.MySql/appsettings.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "IncludeScopes": false, + "LogLevel": { + "Default": "Debug" + } + } +} diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs new file mode 100644 index 0000000..e08125e --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -0,0 +1,174 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Management; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.AzureServiceBus +{ + internal sealed class AzureServiceBusConsumerClient : IConsumerClient + { + private readonly ILogger _logger; + private readonly string _subscriptionName; + private readonly AzureServiceBusOptions _asbOptions; + + private SubscriptionClient _consumerClient; + + private string _lockToken; + + public AzureServiceBusConsumerClient( + ILogger logger, + string subscriptionName, + AzureServiceBusOptions options) + { + _logger = logger; + _subscriptionName = subscriptionName; + _asbOptions = options ?? throw new ArgumentNullException(nameof(options)); + + InitAzureServiceBusClient().GetAwaiter().GetResult(); + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public string ServersAddress => _asbOptions.ConnectionString; + + public void Subscribe(IEnumerable topics) + { + if (topics == null) + { + throw new ArgumentNullException(nameof(topics)); + } + + var allRuleNames = _consumerClient.GetRulesAsync().GetAwaiter().GetResult().Select(x => x.Name); + + foreach (var newRule in topics.Except(allRuleNames)) + { + _consumerClient.AddRuleAsync(new RuleDescription + { + Filter = new CorrelationFilter { Label = newRule }, + Name = newRule + }).GetAwaiter().GetResult(); + + _logger.LogInformation($"Azure Service Bus add rule: {newRule}"); + } + + foreach (var oldRule in allRuleNames.Except(topics)) + { + _consumerClient.RemoveRuleAsync(oldRule).GetAwaiter().GetResult(); + + _logger.LogInformation($"Azure Service Bus remove rule: {oldRule}"); + } + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + + _consumerClient.RegisterMessageHandler(OnConsumerReceived, + new MessageHandlerOptions(OnExceptionReceived) + { + AutoComplete = false, + MaxConcurrentCalls = 10, + MaxAutoRenewDuration = TimeSpan.FromSeconds(30) + }); + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + // ReSharper disable once FunctionNeverReturns + } + + public void Commit() + { + _consumerClient.CompleteAsync(_lockToken); + } + + public void Reject() + { + // ignore + } + + public void Dispose() + { + _consumerClient.CloseAsync().Wait(); + } + + #region private methods + + private async Task InitAzureServiceBusClient() + { + ManagementClient mClient; + if (_asbOptions.ManagementTokenProvider != null) + { + mClient = new ManagementClient(new ServiceBusConnectionStringBuilder( + _asbOptions.ConnectionString), _asbOptions.ManagementTokenProvider); + } + else + { + mClient = new ManagementClient(_asbOptions.ConnectionString); + } + + if (!await mClient.TopicExistsAsync(_asbOptions.TopicPath)) + { + await mClient.CreateTopicAsync(_asbOptions.TopicPath); + _logger.LogInformation($"Azure Service Bus created topic: {_asbOptions.TopicPath}"); + } + + if (!await mClient.SubscriptionExistsAsync(_asbOptions.TopicPath, _subscriptionName)) + { + await mClient.CreateSubscriptionAsync(_asbOptions.TopicPath, _subscriptionName); + _logger.LogInformation($"Azure Service Bus topic {_asbOptions.TopicPath} created subscription: {_subscriptionName}"); + } + + _consumerClient = new SubscriptionClient(_asbOptions.ConnectionString, _asbOptions.TopicPath, _subscriptionName, + ReceiveMode.PeekLock, RetryPolicy.Default); + } + + private Task OnConsumerReceived(Message message, CancellationToken token) + { + _lockToken = message.SystemProperties.LockToken; + var context = new MessageContext + { + Group = _subscriptionName, + Name = message.Label, + Content = Encoding.UTF8.GetString(message.Body) + }; + + OnMessageReceived?.Invoke(null, context); + + return Task.CompletedTask; + } + + private Task OnExceptionReceived(ExceptionReceivedEventArgs args) + { + var context = args.ExceptionReceivedContext; + var exceptionMessage = + $"- Endpoint: {context.Endpoint}\r\n" + + $"- Entity Path: {context.EntityPath}\r\n" + + $"- Executing Action: {context.Action}\r\n" + + $"- Exception: {args.Exception}"; + + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ExceptionReceived, + Reason = exceptionMessage + }; + + OnLog?.Invoke(null, logArgs); + + return Task.CompletedTask; + } + + #endregion private methods + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs new file mode 100644 index 0000000..ddeaf0b --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.AzureServiceBus +{ + internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory + { + private readonly ILoggerFactory _loggerFactory; + private readonly AzureServiceBusOptions _asbOptions; + + public AzureServiceBusConsumerClientFactory( + ILoggerFactory loggerFactory, + AzureServiceBusOptions asbOptions) + { + _loggerFactory = loggerFactory; + _asbOptions = asbOptions; + } + + public IConsumerClient Create(string groupId) + { + var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient)); + return new AzureServiceBusConsumerClient(logger, groupId, _asbOptions); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs new file mode 100644 index 0000000..4729784 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -0,0 +1,40 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Primitives; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + /// + /// Provides programmatic configuration for the CAP Azure Service Bus project. + /// + public class AzureServiceBusOptions + { + /// + /// TopicPath default value for CAP. + /// + public const string DefaultTopicPath = "cap"; + + /// + /// Azure Service Bus Namespace connection string. Must not contain topic information. + /// + public string ConnectionString { get; set; } + + /// + /// The name of the topic relative to the service namespace base address. + /// + public string TopicPath { get; set; } = DefaultTopicPath; + + /// + /// Represents the Azure Active Directory token provider for Azure Managed Service Identity integration. + /// + public ITokenProvider ManagementTokenProvider { get; set; } + + /// + /// Used to generate Service Bus connection strings + /// + public ServiceBusConnectionStringBuilder ConnectionStringBuilder { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs new file mode 100644 index 0000000..b7d95c6 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptionsExtension.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.AzureServiceBus; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal sealed class AzureServiceBusOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public AzureServiceBusOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + + var azureServiceBusOptions = new AzureServiceBusOptions(); + _configure?.Invoke(azureServiceBusOptions); + services.AddSingleton(azureServiceBusOptions); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.Options.Extensions.cs new file mode 100644 index 0000000..0c83046 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.Options.Extensions.cs @@ -0,0 +1,44 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + /// + /// Configuration to use Azure Service Bus in CAP. + /// + /// CAP configuration options + /// Connection string for namespace or the entity. + public static CapOptions UseAzureServiceBus(this CapOptions options, string connectionString) + { + if (connectionString == null) + { + throw new ArgumentNullException(nameof(connectionString)); + } + + return options.UseAzureServiceBus(opt => { opt.ConnectionString = connectionString; }); + } + + /// + /// Configuration to use Azure Service Bus in CAP. + /// + /// CAP configuration options + /// Provides programmatic configuration for the Azure Service Bus. + public static CapOptions UseAzureServiceBus(this CapOptions options, Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + options.RegisterExtension(new AzureServiceBusOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.SubscribeAttribute.cs new file mode 100644 index 0000000..e745555 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.SubscribeAttribute.cs @@ -0,0 +1,25 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.Abstractions; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + /// + /// An attribute for subscribe Kafka messages. + /// + public class CapSubscribeAttribute : TopicAttribute + { + public CapSubscribeAttribute(string name) + : base(name) + { + + } + + public override string ToString() + { + return Name; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj b/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj new file mode 100644 index 0000000..aba90e7 --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/DotNetCore.CAP.AzureServiceBus.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.0 + DotNetCore.CAP.AzureServiceBus + $(PackageTags);AzureServiceBus + + + + NU1605;NU1701 + NU1701;CS1591 + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AzureServiceBus.xml + + + + + + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs b/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs new file mode 100644 index 0000000..898e58b --- /dev/null +++ b/src/DotNetCore.CAP.AzureServiceBus/IPublishMessageSender.AzureServiceBus.cs @@ -0,0 +1,63 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Processor.States; +using Microsoft.Azure.ServiceBus; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.AzureServiceBus +{ + internal class AzureServiceBusPublishMessageSender : BasePublishMessageSender + { + private readonly ILogger _logger; + private readonly ITopicClient _topicClient; + + public AzureServiceBusPublishMessageSender( + ILogger logger, + CapOptions options, + AzureServiceBusOptions asbOptions, + IStateChanger stateChanger, + IStorageConnection connection) + : base(logger, options, connection, stateChanger) + { + _logger = logger; + ServersAddress = asbOptions.ConnectionString; + + _topicClient = new TopicClient( + ServersAddress, + asbOptions.TopicPath, + RetryPolicy.NoRetry); + } + + public override async Task PublishAsync(string keyName, string content) + { + try + { + var contentBytes = Encoding.UTF8.GetBytes(content); + + var message = new Message + { + MessageId = Guid.NewGuid().ToString(), + Body = contentBytes, + Label = keyName, + }; + + await _topicClient.SendAsync(message); + + _logger.LogDebug($"Azure Service Bus message [{keyName}] has been published."); + + return OperateResult.Success; + } + catch (Exception ex) + { + var wrapperEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(wrapperEx); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index 75e3821..34c3450 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -159,11 +159,14 @@ namespace DotNetCore.CAP _logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason); break; case MqLogType.ConsumeError: - _logger.LogError("Kakfa client consume error. --> " + logmsg.Reason); + _logger.LogError("Kafka client consume error. --> " + logmsg.Reason); break; case MqLogType.ServerConnError: _logger.LogCritical("Kafka server connection error. --> " + logmsg.Reason); break; + case MqLogType.ExceptionReceived: + _logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason); + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/MessageContext.cs b/src/DotNetCore.CAP/MessageContext.cs index b42d7ca..17ae8ba 100644 --- a/src/DotNetCore.CAP/MessageContext.cs +++ b/src/DotNetCore.CAP/MessageContext.cs @@ -1,6 +1,8 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Collections.Generic; + namespace DotNetCore.CAP { /// @@ -21,7 +23,7 @@ namespace DotNetCore.CAP /// /// Message content /// - public string Content { get; set; } + public string Content { get; set; } public override string ToString() { diff --git a/src/DotNetCore.CAP/MqLogType.cs b/src/DotNetCore.CAP/MqLogType.cs index 3d84a48..94c21c0 100644 --- a/src/DotNetCore.CAP/MqLogType.cs +++ b/src/DotNetCore.CAP/MqLogType.cs @@ -15,7 +15,10 @@ namespace DotNetCore.CAP //Kafka ConsumeError, - ServerConnError + ServerConnError, + + //AzureServiceBus + ExceptionReceived } public class LogMessageEventArgs : EventArgs