From 55079a3127286814b4af300e86a34ca254b94718 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Wed, 8 Jul 2020 10:41:44 +0800 Subject: [PATCH] Add AWS SQS Support (#603) --- CAP.sln | 7 + CAP.sln.DotSettings | 1 + .../AmazonSQSConsumerClient.cs | 166 ++++++++++++++++++ .../AmazonSQSConsumerClientFactory.cs | 31 ++++ .../CAP.AmazonSQSOptions.cs | 17 ++ .../CAP.AmazonSQSOptionsExtension.cs | 30 ++++ .../CAP.Options.Extensions.cs | 30 ++++ .../DotNetCore.CAP.AmazonSQS.csproj | 23 +++ .../ITransport.AmazonSQS.cs | 125 +++++++++++++ .../SQSReceivedMessage.cs | 18 ++ .../TopicNormalizer.cs | 21 +++ 11 files changed, 469 insertions(+) create mode 100644 src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj create mode 100644 src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs create mode 100644 src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs diff --git a/CAP.sln b/CAP.sln index b5be9a6..c494fab 100644 --- a/CAP.sln +++ b/CAP.sln @@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -147,6 +149,10 @@ Global {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU {2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU + {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -171,6 +177,7 @@ Global {93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/CAP.sln.DotSettings b/CAP.sln.DotSettings index 3eef960..486eace 100644 --- a/CAP.sln.DotSettings +++ b/CAP.sln.DotSettings @@ -1,4 +1,5 @@  DB + SNS True True \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs new file mode 100644 index 0000000..2bd6ee4 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -0,0 +1,166 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using Amazon.SimpleNotificationService; +using Amazon.SimpleNotificationService.Model; +using Amazon.SQS; +using Amazon.SQS.Model; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Headers = DotNetCore.CAP.Messages.Headers; + +namespace DotNetCore.CAP.AmazonSQS +{ + internal sealed class AmazonSQSConsumerClient : IConsumerClient + { + private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); + + private readonly string _groupId; + private readonly AmazonSQSOptions _amazonSQSOptions; + + private IAmazonSimpleNotificationService _snsClient; + private IAmazonSQS _sqsClient; + private string _queueUrl = string.Empty; + + public AmazonSQSConsumerClient(string groupId, IOptions options) + { + _groupId = groupId; + _amazonSQSOptions = options.Value; + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl); + + public void Subscribe(IEnumerable topics) + { + if (topics == null) + { + throw new ArgumentNullException(nameof(topics)); + } + + Connect(initSNS: true, initSQS: false); + + var topicArns = new List(); + foreach (var topic in topics) + { + var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws()); + + var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult(); + + topicArns.Add(createTopicResponse.TopicArn); + } + + Connect(initSNS: false, initSQS: true); + + _snsClient.SubscribeQueueToTopicsAsync(topicArns, _sqsClient, _queueUrl) + .GetAwaiter().GetResult(); + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + Connect(); + + var request = new ReceiveMessageRequest(_queueUrl) + { + WaitTimeSeconds = 5, + MaxNumberOfMessages = 1 + }; + + while (true) + { + var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult(); + + if (response.Messages.Count == 1) + { + var messageObj = JsonConvert.DeserializeObject(response.Messages[0].Body); + + var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); + var body = messageObj.Message; + + var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); + + message.Headers.Add(Headers.Group, _groupId); + + OnMessageReceived?.Invoke(response.Messages[0].ReceiptHandle, message); + } + else + { + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + } + } + + public void Commit(object sender) + { + _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender); + } + + public void Reject(object sender) + { + _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000); + } + + public void Dispose() + { + _sqsClient?.Dispose(); + _snsClient?.Dispose(); + } + + public void Connect(bool initSNS = true, bool initSQS = true) + { + if (_snsClient != null && _sqsClient != null) + { + return; + } + + if (_snsClient == null && initSNS) + { + ConnectionLock.Wait(); + + try + { + _snsClient = _amazonSQSOptions.Credentials != null + ? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region) + : new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Region); + } + finally + { + ConnectionLock.Release(); + } + } + + if (_sqsClient == null && initSQS) + { + ConnectionLock.Wait(); + + try + { + + _sqsClient = _amazonSQSOptions.Credentials != null + ? new AmazonSQSClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region) + : new AmazonSQSClient(_amazonSQSOptions.Region); + + // If provide the name of an existing queue along with the exact names and values + // of all the queue's attributes, CreateQueue returns the queue URL for + // the existing queue. + _queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl; + } + finally + { + ConnectionLock.Release(); + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs new file mode 100644 index 0000000..7665991 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.AmazonSQS +{ + internal sealed class AmazonSQSConsumerClientFactory : IConsumerClientFactory + { + private readonly IOptions _amazonSQSOptions; + + public AmazonSQSConsumerClientFactory(IOptions amazonSQSOptions) + { + _amazonSQSOptions = amazonSQSOptions; + } + + public IConsumerClient Create(string groupId) + { + try + { + var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions); + return client; + } + catch (System.Exception e) + { + throw new BrokerConnectionException(e); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs new file mode 100644 index 0000000..00c751c --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Amazon; +using Amazon.Runtime; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + // ReSharper disable once InconsistentNaming + public class AmazonSQSOptions + { + public RegionEndpoint Region { get; set; } + + public AWSCredentials Credentials { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs new file mode 100644 index 0000000..8f246fc --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using DotNetCore.CAP.AmazonSQS; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal sealed class AmazonSQSOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public AmazonSQSOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + + services.Configure(_configure); + services.AddSingleton(); + services.AddSingleton(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs new file mode 100644 index 0000000..23ede03 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Amazon; +using DotNetCore.CAP; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UseAmazonSQS(this CapOptions options, RegionEndpoint region) + { + return options.UseAmazonSQS(opt => { opt.Region = region; }); + } + + public static CapOptions UseAmazonSQS(this CapOptions options, Action configure) + { + if (configure == null) + { + throw new ArgumentNullException(nameof(configure)); + } + + options.RegisterExtension(new AmazonSQSOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj new file mode 100644 index 0000000..5a7116b --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.0 + DotNetCore.CAP.AmazonSQS + $(PackageTags);AmazonSQS;SQS + + + + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AmazonSQS.xml + 1701;1702;1705;CS1591 + + + + + + + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs new file mode 100644 index 0000000..2ab3bf6 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs @@ -0,0 +1,125 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Amazon.SimpleNotificationService; +using Amazon.SimpleNotificationService.Model; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.AmazonSQS +{ + internal sealed class AmazonSQSTransport : ITransport + { + private readonly ILogger _logger; + private readonly IOptions _sqsOptions; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private IAmazonSimpleNotificationService _snsClient; + private IDictionary _topicArnMaps; + + public AmazonSQSTransport(ILogger logger, IOptions sqsOptions) + { + _logger = logger; + _sqsOptions = sqsOptions; + } + + public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", string.Empty); + + public async Task SendAsync(TransportMessage message) + { + try + { + await TryAddTopicArns(); + + if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn)) + { + string bodyJson = null; + if (message.Body != null) + { + bodyJson = Encoding.UTF8.GetString(message.Body); + } + + var attributes = message.Headers.Where(x => x.Value != null).ToDictionary(x => x.Key, + x => new MessageAttributeValue + { + StringValue = x.Value, + DataType = "String" + }); + + var request = new PublishRequest(arn, bodyJson) + { + MessageAttributes = attributes + }; + + await _snsClient.PublishAsync(request); + + _logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published."); + } + else + { + _logger.LogWarning($"Can't be found SNS topics for [{message.GetName().NormalizeForAws()}]"); + } + return OperateResult.Success; + } + catch (Exception ex) + { + var wrapperEx = new PublisherSentFailedException(ex.Message, ex); + var errors = new OperateError + { + Code = ex.HResult.ToString(), + Description = ex.Message + }; + + return OperateResult.Failed(wrapperEx, errors); + } + } + + public async Task TryAddTopicArns() + { + if (_topicArnMaps != null) + { + return true; + } + + await _semaphore.WaitAsync(); + + try + { + _snsClient = _sqsOptions.Value.Credentials != null + ? new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Credentials, _sqsOptions.Value.Region) + : new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Region); + + if (_topicArnMaps == null) + { + _topicArnMaps = new Dictionary(); + var topics = await _snsClient.ListTopicsAsync(); + topics.Topics.ForEach(x => + { + var name = x.TopicArn.Split(':').Last(); + _topicArnMaps.Add(name, x.TopicArn); + }); + + return true; + } + } + catch (Exception e) + { + _logger.LogError(e, "Init topics from aws sns error!"); + } + finally + { + _semaphore.Release(); + } + + return false; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs b/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs new file mode 100644 index 0000000..893b418 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; + +namespace DotNetCore.CAP.AmazonSQS +{ + class SQSReceivedMessage + { + public string Message { get; set; } + + public Dictionary MessageAttributes { get; set; } + } + + class SQSReceivedMessageAttributes + { + public string Type { get; set; } + + public string Value { get; set; } + } +} diff --git a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs new file mode 100644 index 0000000..f6d0965 --- /dev/null +++ b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs @@ -0,0 +1,21 @@ +using System; + +namespace DotNetCore.CAP.AmazonSQS +{ + public static class TopicNormalizer + { + public static string NormalizeForAws(this string origin) + { + if (origin.Length > 256) + { + throw new ArgumentOutOfRangeException(nameof(origin) + " character string length must between 1~256!"); + } + return origin.Replace(".", "-").Replace(":", "_"); + } + + public static string DeNormalizeForAws(this string origin) + { + return origin.Replace("-", ".").Replace("_", ":"); + } + } +}